From 81ed54aa61a0ff6e94de6e52709d04c1db0a0d44 Mon Sep 17 00:00:00 2001
From: shenlong <139912620+shenlong-tanwen@users.noreply.github.com>
Date: Thu, 17 Apr 2025 20:55:27 +0530
Subject: [PATCH] feat: user sync stream (#16862)

* refactor: user entity

* chore: rebase fixes

* refactor: remove int user Id

* refactor: migrate store userId from int to string

* refactor: rename uid to id

* feat: drift

* pr feedback

* refactor: move common overrides to mixin

* refactor: remove int user Id

* refactor: migrate store userId from int to string

* refactor: rename uid to id

* feat: user & partner sync stream

* pr changes

* refactor: sync service and add tests

* chore: remove generated change

* chore: move sync model

* rebase: convert string ids to byte uuids

* rebase

* add processing logs

* batch db calls

* rewrite isolate manager

* rewrite with worker_manager

* misc fixes

* add sync order test

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: Alex <alex.tran1502@gmail.com>
---
 mobile/analysis_options.yaml                  |   8 +-
 mobile/devtools_options.yaml                  |   1 +
 mobile/lib/constants/constants.dart           |   4 +
 .../domain/interfaces/sync_api.interface.dart |   7 +-
 .../interfaces/sync_stream.interface.dart     |  10 +
 .../domain/models/sync/sync_event.model.dart  |  14 -
 .../lib/domain/models/sync_event.model.dart   |  13 +
 .../domain/services/sync_stream.service.dart  | 217 +++++++--
 mobile/lib/domain/utils/background_sync.dart  |  37 ++
 mobile/lib/extensions/string_extensions.dart  |   9 +
 .../repositories/sync_api.repository.dart     |  93 ++--
 .../repositories/sync_stream.repository.dart  | 104 ++++
 mobile/lib/main.dart                          |   5 +-
 .../providers/background_sync.provider.dart   |   8 +
 .../infrastructure/cancel.provider.dart       |  12 +
 .../providers/infrastructure/db.provider.dart |   9 +
 .../infrastructure/sync_stream.provider.dart  |  27 +-
 mobile/lib/services/auth.service.dart         |  11 +-
 mobile/lib/utils/bootstrap.dart               |   6 +-
 mobile/lib/utils/isolate.dart                 |  69 +++
 mobile/pubspec.lock                           |  10 +-
 mobile/pubspec.yaml                           |   2 +
 mobile/test/domain/service.mock.dart          |   3 +
 .../services/sync_stream_service_test.dart    | 443 ++++++++++++++++++
 mobile/test/fixtures/sync_stream.stub.dart    |  45 ++
 .../test/infrastructure/repository.mock.dart  |   6 +
 mobile/test/service.mocks.dart                |   1 -
 mobile/test/services/auth.service_test.dart   |   8 +
 28 files changed, 1065 insertions(+), 117 deletions(-)
 create mode 100644 mobile/lib/domain/interfaces/sync_stream.interface.dart
 delete mode 100644 mobile/lib/domain/models/sync/sync_event.model.dart
 create mode 100644 mobile/lib/domain/models/sync_event.model.dart
 create mode 100644 mobile/lib/domain/utils/background_sync.dart
 create mode 100644 mobile/lib/infrastructure/repositories/sync_stream.repository.dart
 create mode 100644 mobile/lib/providers/background_sync.provider.dart
 create mode 100644 mobile/lib/providers/infrastructure/cancel.provider.dart
 create mode 100644 mobile/lib/utils/isolate.dart
 create mode 100644 mobile/test/domain/services/sync_stream_service_test.dart
 create mode 100644 mobile/test/fixtures/sync_stream.stub.dart

diff --git a/mobile/analysis_options.yaml b/mobile/analysis_options.yaml
index 04f3145908..854f852e3c 100644
--- a/mobile/analysis_options.yaml
+++ b/mobile/analysis_options.yaml
@@ -35,6 +35,7 @@ linter:
 analyzer:
   exclude:
     - openapi/**
+    - build/**
     - lib/generated_plugin_registrant.dart
     - lib/**/*.g.dart
     - lib/**/*.drift.dart
@@ -92,6 +93,9 @@ custom_lint:
       allowed:
         # required / wanted
         - lib/repositories/*_api.repository.dart
+        - lib/domain/models/sync_event.model.dart
+        - lib/{domain,infrastructure}/**/sync_stream.*
+        - lib/{domain,infrastructure}/**/sync_api.*
         - lib/infrastructure/repositories/*_api.repository.dart
         - lib/infrastructure/utils/*.converter.dart
         # acceptable exceptions for the time being
@@ -144,7 +148,9 @@ dart_code_metrics:
     - avoid-global-state
     - avoid-inverted-boolean-checks
     - avoid-late-final-reassignment
-    - avoid-local-functions
+    - avoid-local-functions:
+        exclude:
+          - test/**.dart
     - avoid-negated-conditions
     - avoid-nested-streams-and-futures
     - avoid-referencing-subclasses
diff --git a/mobile/devtools_options.yaml b/mobile/devtools_options.yaml
index fa0b357c4f..f592d85a9b 100644
--- a/mobile/devtools_options.yaml
+++ b/mobile/devtools_options.yaml
@@ -1,3 +1,4 @@
 description: This file stores settings for Dart & Flutter DevTools.
 documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states
 extensions:
+  - drift: true
\ No newline at end of file
diff --git a/mobile/lib/constants/constants.dart b/mobile/lib/constants/constants.dart
index 83d540d54c..a91e0a715d 100644
--- a/mobile/lib/constants/constants.dart
+++ b/mobile/lib/constants/constants.dart
@@ -5,5 +5,9 @@ const double downloadFailed = -2;
 // Number of log entries to retain on app start
 const int kLogTruncateLimit = 250;
 
+// Sync
+const int kSyncEventBatchSize = 5000;
+
+// Hash batch limits
 const int kBatchHashFileLimit = 128;
 const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB
diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart
index fb8f1aa46e..44e22c5894 100644
--- a/mobile/lib/domain/interfaces/sync_api.interface.dart
+++ b/mobile/lib/domain/interfaces/sync_api.interface.dart
@@ -1,7 +1,8 @@
-import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
+import 'package:immich_mobile/domain/models/sync_event.model.dart';
+import 'package:openapi/api.dart';
 
 abstract interface class ISyncApiRepository {
-  Future<void> ack(String data);
+  Future<void> ack(List<String> data);
 
-  Stream<List<SyncEvent>> watchUserSyncEvent();
+  Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type);
 }
diff --git a/mobile/lib/domain/interfaces/sync_stream.interface.dart b/mobile/lib/domain/interfaces/sync_stream.interface.dart
new file mode 100644
index 0000000000..f9c52d7ee0
--- /dev/null
+++ b/mobile/lib/domain/interfaces/sync_stream.interface.dart
@@ -0,0 +1,10 @@
+import 'package:immich_mobile/domain/interfaces/db.interface.dart';
+import 'package:openapi/api.dart';
+
+abstract interface class ISyncStreamRepository implements IDatabaseRepository {
+  Future<bool> updateUsersV1(Iterable<SyncUserV1> data);
+  Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data);
+
+  Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data);
+  Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data);
+}
diff --git a/mobile/lib/domain/models/sync/sync_event.model.dart b/mobile/lib/domain/models/sync/sync_event.model.dart
deleted file mode 100644
index f4642d59cf..0000000000
--- a/mobile/lib/domain/models/sync/sync_event.model.dart
+++ /dev/null
@@ -1,14 +0,0 @@
-class SyncEvent {
-  // dynamic
-  final dynamic data;
-
-  final String ack;
-
-  SyncEvent({
-    required this.data,
-    required this.ack,
-  });
-
-  @override
-  String toString() => 'SyncEvent(data: $data, ack: $ack)';
-}
diff --git a/mobile/lib/domain/models/sync_event.model.dart b/mobile/lib/domain/models/sync_event.model.dart
new file mode 100644
index 0000000000..2ad8a75fe1
--- /dev/null
+++ b/mobile/lib/domain/models/sync_event.model.dart
@@ -0,0 +1,13 @@
+import 'package:openapi/api.dart';
+
+class SyncEvent {
+  final SyncEntityType type;
+  // ignore: avoid-dynamic
+  final dynamic data;
+  final String ack;
+
+  const SyncEvent({required this.type, required this.data, required this.ack});
+
+  @override
+  String toString() => 'SyncEvent(type: $type, data: $data, ack: $ack)';
+}
diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart
index 72e29b3677..8d7d87e35e 100644
--- a/mobile/lib/domain/services/sync_stream.service.dart
+++ b/mobile/lib/domain/services/sync_stream.service.dart
@@ -1,49 +1,200 @@
+// ignore_for_file: avoid-passing-async-when-sync-expected
+
 import 'dart:async';
 
-import 'package:flutter/foundation.dart';
+import 'package:collection/collection.dart';
 import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
+import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
+import 'package:logging/logging.dart';
 import 'package:openapi/api.dart';
+import 'package:worker_manager/worker_manager.dart';
+
+const _kSyncTypeOrder = [
+  SyncEntityType.userDeleteV1,
+  SyncEntityType.userV1,
+  SyncEntityType.partnerDeleteV1,
+  SyncEntityType.partnerV1,
+  SyncEntityType.assetDeleteV1,
+  SyncEntityType.assetV1,
+  SyncEntityType.assetExifV1,
+  SyncEntityType.partnerAssetDeleteV1,
+  SyncEntityType.partnerAssetV1,
+  SyncEntityType.partnerAssetExifV1,
+];
 
 class SyncStreamService {
+  final Logger _logger = Logger('SyncStreamService');
+
   final ISyncApiRepository _syncApiRepository;
+  final ISyncStreamRepository _syncStreamRepository;
+  final bool Function()? _cancelChecker;
 
-  SyncStreamService(this._syncApiRepository);
+  SyncStreamService({
+    required ISyncApiRepository syncApiRepository,
+    required ISyncStreamRepository syncStreamRepository,
+    bool Function()? cancelChecker,
+  })  : _syncApiRepository = syncApiRepository,
+        _syncStreamRepository = syncStreamRepository,
+        _cancelChecker = cancelChecker;
 
-  StreamSubscription? _userSyncSubscription;
+  Future<bool> _handleSyncData(
+    SyncEntityType type,
+    // ignore: avoid-dynamic
+    Iterable<dynamic> data,
+  ) async {
+    if (data.isEmpty) {
+      _logger.warning("Received empty sync data for $type");
+      return false;
+    }
 
-  void syncUsers() {
-    _userSyncSubscription =
-        _syncApiRepository.watchUserSyncEvent().listen((events) async {
-      for (final event in events) {
-        if (event.data is SyncUserV1) {
-          final data = event.data as SyncUserV1;
-          debugPrint("User Update: $data");
+    _logger.fine("Processing sync data for $type of length ${data.length}");
 
-          // final user = await _userRepository.get(data.id);
-
-          // if (user == null) {
-          //   continue;
-          // }
-
-          // user.name = data.name;
-          // user.email = data.email;
-          // user.updatedAt = DateTime.now();
-
-          // await _userRepository.update(user);
-          // await _syncApiRepository.ack(event.ack);
-        }
-
-        if (event.data is SyncUserDeleteV1) {
-          final data = event.data as SyncUserDeleteV1;
-
-          debugPrint("User delete: $data");
-          // await _syncApiRepository.ack(event.ack);
-        }
+    try {
+      if (type == SyncEntityType.partnerV1) {
+        return await _syncStreamRepository.updatePartnerV1(data.cast());
       }
+
+      if (type == SyncEntityType.partnerDeleteV1) {
+        return await _syncStreamRepository.deletePartnerV1(data.cast());
+      }
+
+      if (type == SyncEntityType.userV1) {
+        return await _syncStreamRepository.updateUsersV1(data.cast());
+      }
+
+      if (type == SyncEntityType.userDeleteV1) {
+        return await _syncStreamRepository.deleteUsersV1(data.cast());
+      }
+    } catch (error, stack) {
+      _logger.severe("Error processing sync data for $type", error, stack);
+      return false;
+    }
+
+    _logger.warning("Unknown sync data type: $type");
+    return false;
+  }
+
+  Future<void> _syncEvent(List<SyncRequestType> types) {
+    _logger.info("Syncing Events: $types");
+    final streamCompleter = Completer();
+    bool shouldComplete = false;
+    // the onDone callback might fire before the events are processed
+    // the following flag ensures that the onDone callback is not called
+    // before the events are processed and also that events are processed sequentially
+    Completer? mutex;
+    StreamSubscription? subscription;
+    try {
+      subscription = _syncApiRepository.getSyncEvents(types).listen(
+        (events) async {
+          if (events.isEmpty) {
+            _logger.warning("Received empty sync events");
+            return;
+          }
+
+          // If previous events are still being processed, wait for them to finish
+          if (mutex != null) {
+            await mutex!.future;
+          }
+
+          if (_cancelChecker?.call() ?? false) {
+            _logger.info("Sync cancelled, stopping stream");
+            subscription?.cancel();
+            if (!streamCompleter.isCompleted) {
+              streamCompleter.completeError(
+                CanceledError(),
+                StackTrace.current,
+              );
+            }
+            return;
+          }
+
+          // Take control of the mutex and process the events
+          mutex = Completer();
+
+          try {
+            final eventsMap = events.groupListsBy((event) => event.type);
+            final Map<SyncEntityType, String> acks = {};
+
+            for (final type in _kSyncTypeOrder) {
+              final data = eventsMap[type];
+              if (data == null) {
+                continue;
+              }
+
+              if (_cancelChecker?.call() ?? false) {
+                _logger.info("Sync cancelled, stopping stream");
+                mutex?.complete();
+                mutex = null;
+                if (!streamCompleter.isCompleted) {
+                  streamCompleter.completeError(
+                    CanceledError(),
+                    StackTrace.current,
+                  );
+                }
+
+                return;
+              }
+
+              if (data.isEmpty) {
+                _logger.warning("Received empty sync events for $type");
+                continue;
+              }
+
+              if (await _handleSyncData(type, data.map((e) => e.data))) {
+                // ignore: avoid-unsafe-collection-methods
+                acks[type] = data.last.ack;
+              } else {
+                _logger.warning("Failed to handle sync events for $type");
+              }
+            }
+
+            if (acks.isNotEmpty) {
+              await _syncApiRepository.ack(acks.values.toList());
+            }
+            _logger.info("$types events processed");
+          } catch (error, stack) {
+            _logger.warning("Error handling sync events", error, stack);
+          } finally {
+            mutex?.complete();
+            mutex = null;
+          }
+
+          if (shouldComplete) {
+            _logger.info("Sync done, completing stream");
+            if (!streamCompleter.isCompleted) streamCompleter.complete();
+          }
+        },
+        onError: (error, stack) {
+          _logger.warning("Error in sync stream for $types", error, stack);
+          // Do not proceed if the stream errors
+          if (!streamCompleter.isCompleted) {
+            // ignore: avoid-missing-completer-stack-trace
+            streamCompleter.completeError(error, stack);
+          }
+        },
+        onDone: () {
+          _logger.info("$types stream done");
+          if (mutex == null && !streamCompleter.isCompleted) {
+            streamCompleter.complete();
+          } else {
+            // Marks the stream as done but does not complete the completer
+            // until the events are processed
+            shouldComplete = true;
+          }
+        },
+      );
+    } catch (error, stack) {
+      _logger.severe("Error starting sync stream", error, stack);
+      if (!streamCompleter.isCompleted) {
+        streamCompleter.completeError(error, stack);
+      }
+    }
+    return streamCompleter.future.whenComplete(() {
+      _logger.info("Sync stream completed");
+      return subscription?.cancel();
     });
   }
 
-  Future<void> dispose() async {
-    await _userSyncSubscription?.cancel();
-  }
+  Future<void> syncUsers() =>
+      _syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]);
 }
diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart
new file mode 100644
index 0000000000..0bd456f0bb
--- /dev/null
+++ b/mobile/lib/domain/utils/background_sync.dart
@@ -0,0 +1,37 @@
+// ignore_for_file: avoid-passing-async-when-sync-expected
+
+import 'dart:async';
+
+import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
+import 'package:immich_mobile/utils/isolate.dart';
+import 'package:worker_manager/worker_manager.dart';
+
+class BackgroundSyncManager {
+  Cancelable<void>? _userSyncTask;
+
+  BackgroundSyncManager();
+
+  Future<void> cancel() {
+    final futures = <Future>[];
+    if (_userSyncTask != null) {
+      futures.add(_userSyncTask!.future);
+    }
+    _userSyncTask?.cancel();
+    _userSyncTask = null;
+    return Future.wait(futures);
+  }
+
+  Future<void> syncUsers() {
+    if (_userSyncTask != null) {
+      return _userSyncTask!.future;
+    }
+
+    _userSyncTask = runInIsolateGentle(
+      computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(),
+    );
+    _userSyncTask!.whenComplete(() {
+      _userSyncTask = null;
+    });
+    return _userSyncTask!.future;
+  }
+}
diff --git a/mobile/lib/extensions/string_extensions.dart b/mobile/lib/extensions/string_extensions.dart
index 67411013ee..73c8c2d34c 100644
--- a/mobile/lib/extensions/string_extensions.dart
+++ b/mobile/lib/extensions/string_extensions.dart
@@ -1,3 +1,7 @@
+import 'dart:typed_data';
+
+import 'package:uuid/parsing.dart';
+
 extension StringExtension on String {
   String capitalize() {
     return split(" ")
@@ -29,3 +33,8 @@ extension DurationExtension on String {
     return int.parse(this);
   }
 }
+
+extension UUIDExtension on String {
+  Uint8List toUuidByte({bool shouldValidate = false}) =>
+      UuidParsing.parseAsByteList(this, validate: shouldValidate);
+}
diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart
index 88a6838c44..a26b867df6 100644
--- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart
+++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart
@@ -1,37 +1,36 @@
 import 'dart:async';
 import 'dart:convert';
 
-import 'package:flutter/foundation.dart';
-import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
-import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
-import 'package:immich_mobile/services/api.service.dart';
-import 'package:openapi/api.dart';
 import 'package:http/http.dart' as http;
+import 'package:immich_mobile/constants/constants.dart';
+import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
+import 'package:immich_mobile/domain/models/sync_event.model.dart';
+import 'package:immich_mobile/services/api.service.dart';
+import 'package:logging/logging.dart';
+import 'package:openapi/api.dart';
 
 class SyncApiRepository implements ISyncApiRepository {
+  final Logger _logger = Logger('SyncApiRepository');
   final ApiService _api;
-  const SyncApiRepository(this._api);
+  final int _batchSize;
+  SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize})
+      : _batchSize = batchSize;
 
   @override
-  Stream<List<SyncEvent>> watchUserSyncEvent() {
-    return _getSyncStream(
-      SyncStreamDto(types: [SyncRequestType.usersV1]),
-    );
+  Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type) {
+    return _getSyncStream(SyncStreamDto(types: type));
   }
 
   @override
-  Future<void> ack(String data) {
-    return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data]));
+  Future<void> ack(List<String> data) {
+    return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
   }
 
-  Stream<List<SyncEvent>> _getSyncStream(
-    SyncStreamDto dto, {
-    int batchSize = 5000,
-  }) async* {
+  Stream<List<SyncEvent>> _getSyncStream(SyncStreamDto dto) async* {
     final client = http.Client();
     final endpoint = "${_api.apiClient.basePath}/sync/stream";
 
-    final headers = <String, String>{
+    final headers = {
       'Content-Type': 'application/json',
       'Accept': 'application/jsonlines+json',
     };
@@ -61,52 +60,54 @@ class SyncApiRepository implements ISyncApiRepository {
 
       await for (final chunk in response.stream.transform(utf8.decoder)) {
         previousChunk += chunk;
-        final parts = previousChunk.split('\n');
+        final parts = previousChunk.toString().split('\n');
         previousChunk = parts.removeLast();
         lines.addAll(parts);
 
-        if (lines.length < batchSize) {
+        if (lines.length < _batchSize) {
           continue;
         }
 
-        yield await compute(_parseSyncResponse, lines);
+        yield _parseSyncResponse(lines);
         lines.clear();
       }
     } finally {
       if (lines.isNotEmpty) {
-        yield await compute(_parseSyncResponse, lines);
+        yield _parseSyncResponse(lines);
       }
       client.close();
     }
   }
+
+  List<SyncEvent> _parseSyncResponse(List<String> lines) {
+    final List<SyncEvent> data = [];
+
+    for (final line in lines) {
+      try {
+        final jsonData = jsonDecode(line);
+        final type = SyncEntityType.fromJson(jsonData['type'])!;
+        final dataJson = jsonData['data'];
+        final ack = jsonData['ack'];
+        final converter = _kResponseMap[type];
+        if (converter == null) {
+          _logger.warning("[_parseSyncResponse] Unknown type $type");
+          continue;
+        }
+
+        data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack));
+      } catch (error, stack) {
+        _logger.severe("[_parseSyncResponse] Error parsing json", error, stack);
+      }
+    }
+
+    return data;
+  }
 }
 
+// ignore: avoid-dynamic
 const _kResponseMap = <SyncEntityType, Function(dynamic)>{
   SyncEntityType.userV1: SyncUserV1.fromJson,
   SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
+  SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
+  SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
 };
-
-// Need to be outside of the class to be able to use compute
-List<SyncEvent> _parseSyncResponse(List<String> lines) {
-  final List<SyncEvent> data = [];
-
-  for (var line in lines) {
-    try {
-      final jsonData = jsonDecode(line);
-      final type = SyncEntityType.fromJson(jsonData['type'])!;
-      final dataJson = jsonData['data'];
-      final ack = jsonData['ack'];
-      final converter = _kResponseMap[type];
-      if (converter == null) {
-        debugPrint("[_parseSyncReponse] Unknown type $type");
-        continue;
-      }
-
-      data.add(SyncEvent(data: converter(dataJson), ack: ack));
-    } catch (error, stack) {
-      debugPrint("[_parseSyncReponse] Error parsing json $error $stack");
-    }
-  }
-
-  return data;
-}
diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart
new file mode 100644
index 0000000000..a947a9a66b
--- /dev/null
+++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart
@@ -0,0 +1,104 @@
+import 'package:drift/drift.dart';
+import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
+import 'package:immich_mobile/extensions/string_extensions.dart';
+import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart';
+import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart';
+import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
+import 'package:logging/logging.dart';
+import 'package:openapi/api.dart';
+
+class DriftSyncStreamRepository extends DriftDatabaseRepository
+    implements ISyncStreamRepository {
+  final Logger _logger = Logger('DriftSyncStreamRepository');
+  final Drift _db;
+
+  DriftSyncStreamRepository(super.db) : _db = db;
+
+  @override
+  Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async {
+    try {
+      await _db.batch((batch) {
+        for (final user in data) {
+          batch.delete(
+            _db.userEntity,
+            UserEntityCompanion(id: Value(user.userId.toUuidByte())),
+          );
+        }
+      });
+      return true;
+    } catch (e, s) {
+      _logger.severe('Error while processing SyncUserDeleteV1', e, s);
+      return false;
+    }
+  }
+
+  @override
+  Future<bool> updateUsersV1(Iterable<SyncUserV1> data) async {
+    try {
+      await _db.batch((batch) {
+        for (final user in data) {
+          final companion = UserEntityCompanion(
+            name: Value(user.name),
+            email: Value(user.email),
+          );
+
+          batch.insert(
+            _db.userEntity,
+            companion.copyWith(id: Value(user.id.toUuidByte())),
+            onConflict: DoUpdate((_) => companion),
+          );
+        }
+      });
+      return true;
+    } catch (e, s) {
+      _logger.severe('Error while processing SyncUserV1', e, s);
+      return false;
+    }
+  }
+
+  @override
+  Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data) async {
+    try {
+      await _db.batch((batch) {
+        for (final partner in data) {
+          batch.delete(
+            _db.partnerEntity,
+            PartnerEntityCompanion(
+              sharedById: Value(partner.sharedById.toUuidByte()),
+              sharedWithId: Value(partner.sharedWithId.toUuidByte()),
+            ),
+          );
+        }
+      });
+      return true;
+    } catch (e, s) {
+      _logger.severe('Error while processing SyncPartnerDeleteV1', e, s);
+      return false;
+    }
+  }
+
+  @override
+  Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data) async {
+    try {
+      await _db.batch((batch) {
+        for (final partner in data) {
+          final companion =
+              PartnerEntityCompanion(inTimeline: Value(partner.inTimeline));
+
+          batch.insert(
+            _db.partnerEntity,
+            companion.copyWith(
+              sharedById: Value(partner.sharedById.toUuidByte()),
+              sharedWithId: Value(partner.sharedWithId.toUuidByte()),
+            ),
+            onConflict: DoUpdate((_) => companion),
+          );
+        }
+      });
+      return true;
+    } catch (e, s) {
+      _logger.severe('Error while processing SyncPartnerV1', e, s);
+      return false;
+    }
+  }
+}
diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart
index 1a434aa359..73af81d69d 100644
--- a/mobile/lib/main.dart
+++ b/mobile/lib/main.dart
@@ -11,6 +11,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart';
 import 'package:hooks_riverpod/hooks_riverpod.dart';
 import 'package:immich_mobile/constants/locales.dart';
 import 'package:immich_mobile/extensions/build_context_extensions.dart';
+import 'package:immich_mobile/generated/codegen_loader.g.dart';
 import 'package:immich_mobile/providers/app_life_cycle.provider.dart';
 import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart';
 import 'package:immich_mobile/providers/db.provider.dart';
@@ -31,13 +32,15 @@ import 'package:immich_mobile/utils/migration.dart';
 import 'package:intl/date_symbol_data_local.dart';
 import 'package:logging/logging.dart';
 import 'package:timezone/data/latest.dart';
-import 'package:immich_mobile/generated/codegen_loader.g.dart';
+import 'package:worker_manager/worker_manager.dart';
 
 void main() async {
   ImmichWidgetsBinding();
   final db = await Bootstrap.initIsar();
   await Bootstrap.initDomain(db);
   await initApp();
+  // Warm-up isolate pool for worker manager
+  await workerManager.init(dynamicSpawning: true);
   await migrateDatabaseIfNeeded(db);
   HttpOverrides.global = HttpSSLCertOverride();
 
diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart
new file mode 100644
index 0000000000..83d103bb3b
--- /dev/null
+++ b/mobile/lib/providers/background_sync.provider.dart
@@ -0,0 +1,8 @@
+import 'package:hooks_riverpod/hooks_riverpod.dart';
+import 'package:immich_mobile/domain/utils/background_sync.dart';
+
+final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
+  final manager = BackgroundSyncManager();
+  ref.onDispose(manager.cancel);
+  return manager;
+});
diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart
new file mode 100644
index 0000000000..6851861e1a
--- /dev/null
+++ b/mobile/lib/providers/infrastructure/cancel.provider.dart
@@ -0,0 +1,12 @@
+import 'package:hooks_riverpod/hooks_riverpod.dart';
+
+/// Provider holding a boolean function that returns true when cancellation is requested.
+/// A computation running in the isolate uses the function to implement cooperative cancellation.
+final cancellationProvider = Provider<bool Function()>(
+  // This will be overridden in the isolate's container.
+  // Throwing ensures it's not used without an override.
+  (ref) => throw UnimplementedError(
+    "cancellationProvider must be overridden in the isolate's ProviderContainer and not to be used in the root isolate",
+  ),
+  name: 'cancellationProvider',
+);
diff --git a/mobile/lib/providers/infrastructure/db.provider.dart b/mobile/lib/providers/infrastructure/db.provider.dart
index 84010b3b96..4eefbc556c 100644
--- a/mobile/lib/providers/infrastructure/db.provider.dart
+++ b/mobile/lib/providers/infrastructure/db.provider.dart
@@ -1,4 +1,7 @@
+import 'dart:async';
+
 import 'package:hooks_riverpod/hooks_riverpod.dart';
+import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
 import 'package:isar/isar.dart';
 import 'package:riverpod_annotation/riverpod_annotation.dart';
 
@@ -6,3 +9,9 @@ part 'db.provider.g.dart';
 
 @Riverpod(keepAlive: true)
 Isar isar(Ref ref) => throw UnimplementedError('isar');
+
+final driftProvider = Provider<Drift>((ref) {
+  final drift = Drift();
+  ref.onDispose(() => unawaited(drift.close()));
+  return drift;
+});
diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart
index 64f1a6cb05..e313982a30 100644
--- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart
+++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart
@@ -1,24 +1,23 @@
-import 'dart:async';
-
 import 'package:hooks_riverpod/hooks_riverpod.dart';
 import 'package:immich_mobile/domain/services/sync_stream.service.dart';
 import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
+import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
 import 'package:immich_mobile/providers/api.provider.dart';
+import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
+import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
 
 final syncStreamServiceProvider = Provider(
-  (ref) {
-    final instance = SyncStreamService(
-      ref.watch(syncApiRepositoryProvider),
-    );
-
-    ref.onDispose(() => unawaited(instance.dispose()));
-
-    return instance;
-  },
+  (ref) => SyncStreamService(
+    syncApiRepository: ref.watch(syncApiRepositoryProvider),
+    syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
+    cancelChecker: ref.watch(cancellationProvider),
+  ),
 );
 
 final syncApiRepositoryProvider = Provider(
-  (ref) => SyncApiRepository(
-    ref.watch(apiServiceProvider),
-  ),
+  (ref) => SyncApiRepository(ref.watch(apiServiceProvider)),
+);
+
+final syncStreamRepositoryProvider = Provider(
+  (ref) => DriftSyncStreamRepository(ref.watch(driftProvider)),
 );
diff --git a/mobile/lib/services/auth.service.dart b/mobile/lib/services/auth.service.dart
index 20fa62dc4b..ec053c078b 100644
--- a/mobile/lib/services/auth.service.dart
+++ b/mobile/lib/services/auth.service.dart
@@ -3,12 +3,14 @@ import 'dart:io';
 
 import 'package:hooks_riverpod/hooks_riverpod.dart';
 import 'package:immich_mobile/domain/models/store.model.dart';
+import 'package:immich_mobile/domain/utils/background_sync.dart';
 import 'package:immich_mobile/entities/store.entity.dart';
 import 'package:immich_mobile/interfaces/auth.interface.dart';
 import 'package:immich_mobile/interfaces/auth_api.interface.dart';
 import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart';
 import 'package:immich_mobile/models/auth/login_response.model.dart';
 import 'package:immich_mobile/providers/api.provider.dart';
+import 'package:immich_mobile/providers/background_sync.provider.dart';
 import 'package:immich_mobile/repositories/auth.repository.dart';
 import 'package:immich_mobile/repositories/auth_api.repository.dart';
 import 'package:immich_mobile/services/api.service.dart';
@@ -22,6 +24,7 @@ final authServiceProvider = Provider(
     ref.watch(authRepositoryProvider),
     ref.watch(apiServiceProvider),
     ref.watch(networkServiceProvider),
+    ref.watch(backgroundSyncProvider),
   ),
 );
 
@@ -30,6 +33,7 @@ class AuthService {
   final IAuthRepository _authRepository;
   final ApiService _apiService;
   final NetworkService _networkService;
+  final BackgroundSyncManager _backgroundSyncManager;
 
   final _log = Logger("AuthService");
 
@@ -38,6 +42,7 @@ class AuthService {
     this._authRepository,
     this._apiService,
     this._networkService,
+    this._backgroundSyncManager,
   );
 
   /// Validates the provided server URL by resolving and setting the endpoint.
@@ -115,8 +120,10 @@ class AuthService {
   /// - Asset ETag
   ///
   /// All deletions are executed in parallel using [Future.wait].
-  Future<void> clearLocalData() {
-    return Future.wait([
+  Future<void> clearLocalData() async {
+    // Cancel any ongoing background sync operations before clearing data
+    await _backgroundSyncManager.cancel();
+    await Future.wait([
       _authRepository.clearLocalData(),
       Store.delete(StoreKey.currentUser),
       Store.delete(StoreKey.accessToken),
diff --git a/mobile/lib/utils/bootstrap.dart b/mobile/lib/utils/bootstrap.dart
index 570752c6d9..26f3b49242 100644
--- a/mobile/lib/utils/bootstrap.dart
+++ b/mobile/lib/utils/bootstrap.dart
@@ -48,11 +48,15 @@ abstract final class Bootstrap {
     );
   }
 
-  static Future<void> initDomain(Isar db) async {
+  static Future<void> initDomain(
+    Isar db, {
+    bool shouldBufferLogs = true,
+  }) async {
     await StoreService.init(storeRepository: IsarStoreRepository(db));
     await LogService.init(
       logRepository: IsarLogRepository(db),
       storeRepository: IsarStoreRepository(db),
+      shouldBuffer: shouldBufferLogs,
     );
   }
 }
diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart
new file mode 100644
index 0000000000..cfbb1b544f
--- /dev/null
+++ b/mobile/lib/utils/isolate.dart
@@ -0,0 +1,69 @@
+import 'dart:async';
+import 'dart:ui';
+
+import 'package:flutter/services.dart';
+import 'package:hooks_riverpod/hooks_riverpod.dart';
+import 'package:immich_mobile/providers/db.provider.dart';
+import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
+import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
+import 'package:immich_mobile/utils/bootstrap.dart';
+import 'package:logging/logging.dart';
+import 'package:worker_manager/worker_manager.dart';
+
+class InvalidIsolateUsageException implements Exception {
+  const InvalidIsolateUsageException();
+
+  @override
+  String toString() =>
+      "IsolateHelper should only be used from the root isolate";
+}
+
+// !! Should be used only from the root isolate
+Cancelable<T?> runInIsolateGentle<T>({
+  required Future<T> Function(ProviderContainer ref) computation,
+  String? debugLabel,
+}) {
+  final token = RootIsolateToken.instance;
+  if (token == null) {
+    throw const InvalidIsolateUsageException();
+  }
+
+  return workerManager.executeGentle((cancelledChecker) async {
+    BackgroundIsolateBinaryMessenger.ensureInitialized(token);
+    DartPluginRegistrant.ensureInitialized();
+
+    final db = await Bootstrap.initIsar();
+    await Bootstrap.initDomain(db, shouldBufferLogs: false);
+    final ref = ProviderContainer(
+      overrides: [
+        // TODO: Remove once isar is removed
+        dbProvider.overrideWithValue(db),
+        isarProvider.overrideWithValue(db),
+        cancellationProvider.overrideWithValue(cancelledChecker),
+      ],
+    );
+
+    Logger log = Logger("IsolateLogger");
+
+    try {
+      return await computation(ref);
+    } on CanceledError {
+      log.warning(
+        "Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}",
+      );
+    } catch (error, stack) {
+      log.severe(
+        "Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}",
+        error,
+        stack,
+      );
+    } finally {
+      // Wait for the logs to flush
+      await Future.delayed(const Duration(seconds: 2));
+      // Always close the new db connection on Isolate end
+      ref.read(driftProvider).close();
+      ref.read(isarProvider).close();
+    }
+    return null;
+  });
+}
diff --git a/mobile/pubspec.lock b/mobile/pubspec.lock
index 3731832296..235b3f71c3 100644
--- a/mobile/pubspec.lock
+++ b/mobile/pubspec.lock
@@ -1806,7 +1806,7 @@ packages:
     source: hosted
     version: "3.1.4"
   uuid:
-    dependency: transitive
+    dependency: "direct main"
     description:
       name: uuid
       sha256: a5be9ef6618a7ac1e964353ef476418026db906c4facdedaa299b7a2e71690ff
@@ -1933,6 +1933,14 @@ packages:
       url: "https://pub.dev"
     source: hosted
     version: "0.0.3"
+  worker_manager:
+    dependency: "direct main"
+    description:
+      name: worker_manager
+      sha256: "086ed63e9b36266e851404ca90fd44e37c0f4c9bbf819e5f8d7c87f9741c0591"
+      url: "https://pub.dev"
+    source: hosted
+    version: "7.2.3"
   xdg_directories:
     dependency: transitive
     description:
diff --git a/mobile/pubspec.yaml b/mobile/pubspec.yaml
index 03c39810f6..fdd91e1f87 100644
--- a/mobile/pubspec.yaml
+++ b/mobile/pubspec.yaml
@@ -60,7 +60,9 @@ dependencies:
   thumbhash: 0.1.0+1
   timezone: ^0.9.4
   url_launcher: ^6.3.1
+  uuid: ^4.5.1
   wakelock_plus: ^1.2.10
+  worker_manager: ^7.2.3
 
   native_video_player:
     git:
diff --git a/mobile/test/domain/service.mock.dart b/mobile/test/domain/service.mock.dart
index 53a173fc28..97a3f30294 100644
--- a/mobile/test/domain/service.mock.dart
+++ b/mobile/test/domain/service.mock.dart
@@ -1,7 +1,10 @@
 import 'package:immich_mobile/domain/services/store.service.dart';
 import 'package:immich_mobile/domain/services/user.service.dart';
+import 'package:immich_mobile/domain/utils/background_sync.dart';
 import 'package:mocktail/mocktail.dart';
 
 class MockStoreService extends Mock implements StoreService {}
 
 class MockUserService extends Mock implements UserService {}
+
+class MockBackgroundSyncManager extends Mock implements BackgroundSyncManager {}
diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart
new file mode 100644
index 0000000000..e1d8e6987f
--- /dev/null
+++ b/mobile/test/domain/services/sync_stream_service_test.dart
@@ -0,0 +1,443 @@
+// ignore_for_file: avoid-unnecessary-futures, avoid-async-call-in-sync-function
+
+import 'dart:async';
+
+import 'package:flutter_test/flutter_test.dart';
+import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
+import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
+import 'package:immich_mobile/domain/models/sync_event.model.dart';
+import 'package:immich_mobile/domain/services/sync_stream.service.dart';
+import 'package:mocktail/mocktail.dart';
+import 'package:openapi/api.dart';
+import 'package:worker_manager/worker_manager.dart';
+
+import '../../fixtures/sync_stream.stub.dart';
+import '../../infrastructure/repository.mock.dart';
+
+class _CancellationWrapper {
+  const _CancellationWrapper();
+
+  bool isCancelled() => false;
+}
+
+class _MockCancellationWrapper extends Mock implements _CancellationWrapper {}
+
+void main() {
+  late SyncStreamService sut;
+  late ISyncStreamRepository mockSyncStreamRepo;
+  late ISyncApiRepository mockSyncApiRepo;
+  late StreamController<List<SyncEvent>> streamController;
+
+  successHandler(Invocation _) async => true;
+  failureHandler(Invocation _) async => false;
+
+  setUp(() {
+    mockSyncStreamRepo = MockSyncStreamRepository();
+    mockSyncApiRepo = MockSyncApiRepository();
+    streamController = StreamController<List<SyncEvent>>.broadcast();
+
+    sut = SyncStreamService(
+      syncApiRepository: mockSyncApiRepo,
+      syncStreamRepository: mockSyncStreamRepo,
+    );
+
+    // Default stream setup - emits one batch and closes
+    when(() => mockSyncApiRepo.getSyncEvents(any()))
+        .thenAnswer((_) => streamController.stream);
+
+    // Default ack setup
+    when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {});
+
+    // Register fallbacks for mocktail verification
+    registerFallbackValue(<SyncUserV1>[]);
+    registerFallbackValue(<SyncPartnerV1>[]);
+    registerFallbackValue(<SyncUserDeleteV1>[]);
+    registerFallbackValue(<SyncPartnerDeleteV1>[]);
+
+    // Default successful repository calls
+    when(() => mockSyncStreamRepo.updateUsersV1(any()))
+        .thenAnswer(successHandler);
+    when(() => mockSyncStreamRepo.deleteUsersV1(any()))
+        .thenAnswer(successHandler);
+    when(() => mockSyncStreamRepo.updatePartnerV1(any()))
+        .thenAnswer(successHandler);
+    when(() => mockSyncStreamRepo.deletePartnerV1(any()))
+        .thenAnswer(successHandler);
+  });
+
+  tearDown(() async {
+    if (!streamController.isClosed) {
+      await streamController.close();
+    }
+  });
+
+  // Helper to trigger sync and add events to the stream
+  Future<void> triggerSyncAndEmit(List<SyncEvent> events) async {
+    final future = sut.syncUsers(); // Start listening
+    await Future.delayed(Duration.zero); // Allow listener to attach
+    if (!streamController.isClosed) {
+      streamController.add(events);
+      await streamController.close(); // Close after emitting
+    }
+    await future; // Wait for processing to complete
+  }
+
+  group("SyncStreamService", () {
+    test(
+      "completes successfully when stream emits data and handlers succeed",
+      () async {
+        final events = [
+          ...SyncStreamStub.userEvents,
+          ...SyncStreamStub.partnerEvents,
+        ];
+        final future = triggerSyncAndEmit(events);
+        await expectLater(future, completes);
+        // Verify ack includes last ack from each successfully handled type
+        verify(
+          () =>
+              mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))),
+        ).called(1);
+      },
+    );
+
+    test("completes successfully when stream emits an error", () async {
+      when(() => mockSyncApiRepo.getSyncEvents(any()))
+          .thenAnswer((_) => Stream.error(Exception("Stream Error")));
+      // Should complete gracefully without throwing
+      await expectLater(sut.syncUsers(), throwsException);
+      verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error
+    });
+
+    test("throws when initial getSyncEvents call fails", () async {
+      final apiException = Exception("API Error");
+      when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException);
+      // Should rethrow the exception from the initial call
+      await expectLater(sut.syncUsers(), throwsA(apiException));
+      verifyNever(() => mockSyncApiRepo.ack(any()));
+    });
+
+    test(
+      "completes successfully when a repository handler throws an exception",
+      () async {
+        when(() => mockSyncStreamRepo.updateUsersV1(any()))
+            .thenThrow(Exception("Repo Error"));
+        final events = [
+          ...SyncStreamStub.userEvents,
+          ...SyncStreamStub.partnerEvents,
+        ];
+        // Should complete, but ack only for the successful types
+        await triggerSyncAndEmit(events);
+        // Only partner delete was successful by default setup
+        verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1);
+      },
+    );
+
+    test(
+      "completes successfully but sends no ack when all handlers fail",
+      () async {
+        when(() => mockSyncStreamRepo.updateUsersV1(any()))
+            .thenAnswer(failureHandler);
+        when(() => mockSyncStreamRepo.deleteUsersV1(any()))
+            .thenAnswer(failureHandler);
+        when(() => mockSyncStreamRepo.updatePartnerV1(any()))
+            .thenAnswer(failureHandler);
+        when(() => mockSyncStreamRepo.deletePartnerV1(any()))
+            .thenAnswer(failureHandler);
+
+        final events = [
+          ...SyncStreamStub.userEvents,
+          ...SyncStreamStub.partnerEvents,
+        ];
+        await triggerSyncAndEmit(events);
+        verifyNever(() => mockSyncApiRepo.ack(any()));
+      },
+    );
+
+    test("sends ack only for types where handler returns true", () async {
+      // Mock specific handlers: user update fails, user delete succeeds
+      when(() => mockSyncStreamRepo.updateUsersV1(any()))
+          .thenAnswer(failureHandler);
+      when(() => mockSyncStreamRepo.deleteUsersV1(any()))
+          .thenAnswer(successHandler);
+      // partner update fails, partner delete succeeds
+      when(() => mockSyncStreamRepo.updatePartnerV1(any()))
+          .thenAnswer(failureHandler);
+
+      final events = [
+        ...SyncStreamStub.userEvents,
+        ...SyncStreamStub.partnerEvents,
+      ];
+      await triggerSyncAndEmit(events);
+
+      // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4")
+      verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"]))))
+          .called(1);
+    });
+
+    test("does not process or ack when stream emits an empty list", () async {
+      final future = sut.syncUsers();
+      streamController.add([]); // Emit empty list
+      await streamController.close();
+      await future; // Wait for completion
+
+      verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
+      verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any()));
+      verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any()));
+      verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any()));
+      verifyNever(() => mockSyncApiRepo.ack(any()));
+    });
+
+    test("processes multiple batches sequentially using mutex", () async {
+      final completer1 = Completer<void>();
+      final completer2 = Completer<void>();
+      int callOrder = 0;
+      int handler1StartOrder = -1;
+      int handler2StartOrder = -1;
+      int handler1Calls = 0;
+      int handler2Calls = 0;
+
+      when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
+        handler1Calls++;
+        handler1StartOrder = ++callOrder;
+        await completer1.future;
+        return true;
+      });
+      when(() => mockSyncStreamRepo.updatePartnerV1(any()))
+          .thenAnswer((_) async {
+        handler2Calls++;
+        handler2StartOrder = ++callOrder;
+        await completer2.future;
+        return true;
+      });
+
+      final batch1 = SyncStreamStub.userEvents;
+      final batch2 = SyncStreamStub.partnerEvents;
+
+      final syncFuture = sut.syncUsers();
+      await pumpEventQueue();
+
+      streamController.add(batch1);
+      await pumpEventQueue();
+      // Small delay to ensure the first handler starts
+      await Future.delayed(const Duration(milliseconds: 20));
+
+      expect(handler1StartOrder, 1, reason: "Handler 1 should start first");
+      expect(handler1Calls, 1);
+
+      streamController.add(batch2);
+      await pumpEventQueue();
+      // Small delay
+      await Future.delayed(const Duration(milliseconds: 20));
+
+      expect(handler2StartOrder, -1, reason: "Handler 2 should wait");
+      expect(handler2Calls, 0);
+
+      completer1.complete();
+      await pumpEventQueue(times: 40);
+      // Small delay to ensure the second handler starts
+      await Future.delayed(const Duration(milliseconds: 20));
+
+      expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1");
+      expect(handler2Calls, 1);
+
+      completer2.complete();
+      await pumpEventQueue(times: 40);
+      // Small delay before closing the stream
+      await Future.delayed(const Duration(milliseconds: 20));
+
+      if (!streamController.isClosed) {
+        await streamController.close();
+      }
+      await pumpEventQueue(times: 40);
+      // Small delay to ensure the sync completes
+      await Future.delayed(const Duration(milliseconds: 20));
+
+      await syncFuture;
+
+      verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1);
+      verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1);
+      verify(() => mockSyncApiRepo.ack(any())).called(2);
+    });
+
+    test(
+      "stops processing and ack when cancel checker is completed",
+      () async {
+        final cancellationChecker = _MockCancellationWrapper();
+        when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false);
+
+        sut = SyncStreamService(
+          syncApiRepository: mockSyncApiRepo,
+          syncStreamRepository: mockSyncStreamRepo,
+          cancelChecker: cancellationChecker.isCancelled,
+        );
+
+        final processingCompleter = Completer<void>();
+        bool handlerStarted = false;
+
+        // Make handler wait so we can cancel it mid-flight
+        when(() => mockSyncStreamRepo.deleteUsersV1(any()))
+            .thenAnswer((_) async {
+          handlerStarted = true;
+          await processingCompleter
+              .future; // Wait indefinitely until test completes it
+          return true;
+        });
+
+        final syncFuture = sut.syncUsers();
+        await pumpEventQueue(times: 30);
+
+        streamController.add(SyncStreamStub.userEvents);
+        // Ensure processing starts
+        await Future.delayed(const Duration(milliseconds: 10));
+
+        expect(handlerStarted, isTrue, reason: "Handler should have started");
+
+        when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true);
+
+        // Allow cancellation logic to propagate
+        await Future.delayed(const Duration(milliseconds: 10));
+
+        // Complete the handler's completer after cancellation signal
+        // to ensure the cancellation logic itself isn't blocked by the handler.
+        processingCompleter.complete();
+
+        await expectLater(syncFuture, throwsA(isA<CanceledError>()));
+
+        // Verify that ack was NOT called because processing was cancelled
+        verifyNever(() => mockSyncApiRepo.ack(any()));
+      },
+    );
+
+    test("completes successfully when ack call throws an exception", () async {
+      when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error"));
+      final events = [
+        ...SyncStreamStub.userEvents,
+        ...SyncStreamStub.partnerEvents,
+      ];
+
+      // Should still complete even if ack fails
+      await triggerSyncAndEmit(events);
+      verify(() => mockSyncApiRepo.ack(any()))
+          .called(1); // Verify ack was attempted
+    });
+
+    test("waits for processing to finish if onDone called early", () async {
+      final processingCompleter = Completer<void>();
+      bool handlerFinished = false;
+
+      when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
+        await processingCompleter.future; // Wait inside handler
+        handlerFinished = true;
+        return true;
+      });
+
+      final syncFuture = sut.syncUsers();
+      // Allow listener to attach
+      // This is necessary to ensure the stream is ready to receive events
+      await Future.delayed(Duration.zero);
+
+      streamController.add(SyncStreamStub.userEvents); // Emit batch
+      await Future.delayed(
+        const Duration(milliseconds: 10),
+      ); // Ensure processing starts
+
+      await streamController
+          .close(); // Close stream (triggers onDone internally)
+      await Future.delayed(
+        const Duration(milliseconds: 10),
+      ); // Give onDone a chance to fire
+
+      // At this point, onDone was called, but processing is blocked
+      expect(handlerFinished, isFalse);
+
+      processingCompleter.complete(); // Allow processing to finish
+      await syncFuture; // Now the main future should complete
+
+      expect(handlerFinished, isTrue);
+      verify(() => mockSyncApiRepo.ack(any())).called(1);
+    });
+
+    test("processes events in the defined _kSyncTypeOrder", () async {
+      final future = sut.syncUsers();
+      await pumpEventQueue();
+      if (!streamController.isClosed) {
+        final events = [
+          SyncEvent(
+            type: SyncEntityType.partnerV1,
+            data: SyncStreamStub.partnerV1,
+            ack: "1",
+          ), // Should be processed last
+          SyncEvent(
+            type: SyncEntityType.userV1,
+            data: SyncStreamStub.userV1Admin,
+            ack: "2",
+          ), // Should be processed second
+          SyncEvent(
+            type: SyncEntityType.partnerDeleteV1,
+            data: SyncStreamStub.partnerDeleteV1,
+            ack: "3",
+          ), // Should be processed third
+          SyncEvent(
+            type: SyncEntityType.userDeleteV1,
+            data: SyncStreamStub.userDeleteV1,
+            ack: "4",
+          ), // Should be processed first
+        ];
+
+        streamController.add(events);
+        await streamController.close();
+      }
+      await future;
+
+      verifyInOrder([
+        () => mockSyncStreamRepo.deleteUsersV1(any()),
+        () => mockSyncStreamRepo.updateUsersV1(any()),
+        () => mockSyncStreamRepo.deletePartnerV1(any()),
+        () => mockSyncStreamRepo.updatePartnerV1(any()),
+        // Verify ack happens after all processing
+        () => mockSyncApiRepo.ack(any()),
+      ]);
+    });
+  });
+
+  group("syncUsers", () {
+    test("calls getSyncEvents with correct types", () async {
+      // Need to close the stream for the future to complete
+      final future = sut.syncUsers();
+      await streamController.close();
+      await future;
+
+      verify(
+        () => mockSyncApiRepo.getSyncEvents([
+          SyncRequestType.usersV1,
+          SyncRequestType.partnersV1,
+        ]),
+      ).called(1);
+    });
+
+    test("calls repository methods with correctly grouped data", () async {
+      final events = [
+        ...SyncStreamStub.userEvents,
+        ...SyncStreamStub.partnerEvents,
+      ];
+      await triggerSyncAndEmit(events);
+
+      // Verify each handler was called with the correct list of data payloads
+      verify(
+        () => mockSyncStreamRepo.updateUsersV1(
+          [SyncStreamStub.userV1Admin, SyncStreamStub.userV1User],
+        ),
+      ).called(1);
+      verify(
+        () => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]),
+      ).called(1);
+      verify(
+        () => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]),
+      ).called(1);
+      verify(
+        () => mockSyncStreamRepo
+            .deletePartnerV1([SyncStreamStub.partnerDeleteV1]),
+      ).called(1);
+    });
+  });
+}
diff --git a/mobile/test/fixtures/sync_stream.stub.dart b/mobile/test/fixtures/sync_stream.stub.dart
new file mode 100644
index 0000000000..781e63a2bb
--- /dev/null
+++ b/mobile/test/fixtures/sync_stream.stub.dart
@@ -0,0 +1,45 @@
+import 'package:immich_mobile/domain/models/sync_event.model.dart';
+import 'package:openapi/api.dart';
+
+abstract final class SyncStreamStub {
+  static final userV1Admin = SyncUserV1(
+    deletedAt: DateTime(2020),
+    email: "admin@admin",
+    id: "1",
+    name: "Admin",
+  );
+  static final userV1User = SyncUserV1(
+    deletedAt: DateTime(2021),
+    email: "user@user",
+    id: "2",
+    name: "User",
+  );
+  static final userDeleteV1 = SyncUserDeleteV1(userId: "2");
+  static final userEvents = [
+    SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"),
+    SyncEvent(
+      type: SyncEntityType.userDeleteV1,
+      data: userDeleteV1,
+      ack: "2",
+    ),
+    SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"),
+  ];
+
+  static final partnerV1 = SyncPartnerV1(
+    inTimeline: true,
+    sharedById: "1",
+    sharedWithId: "2",
+  );
+  static final partnerDeleteV1 = SyncPartnerDeleteV1(
+    sharedById: "3",
+    sharedWithId: "4",
+  );
+  static final partnerEvents = [
+    SyncEvent(
+      type: SyncEntityType.partnerDeleteV1,
+      data: partnerDeleteV1,
+      ack: "4",
+    ),
+    SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"),
+  ];
+}
diff --git a/mobile/test/infrastructure/repository.mock.dart b/mobile/test/infrastructure/repository.mock.dart
index 192858adff..c4a5680f71 100644
--- a/mobile/test/infrastructure/repository.mock.dart
+++ b/mobile/test/infrastructure/repository.mock.dart
@@ -1,6 +1,8 @@
 import 'package:immich_mobile/domain/interfaces/device_asset.interface.dart';
 import 'package:immich_mobile/domain/interfaces/log.interface.dart';
 import 'package:immich_mobile/domain/interfaces/store.interface.dart';
+import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
+import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
 import 'package:immich_mobile/domain/interfaces/user.interface.dart';
 import 'package:immich_mobile/domain/interfaces/user_api.interface.dart';
 import 'package:mocktail/mocktail.dart';
@@ -14,5 +16,9 @@ class MockUserRepository extends Mock implements IUserRepository {}
 class MockDeviceAssetRepository extends Mock
     implements IDeviceAssetRepository {}
 
+class MockSyncStreamRepository extends Mock implements ISyncStreamRepository {}
+
 // API Repos
 class MockUserApiRepository extends Mock implements IUserApiRepository {}
+
+class MockSyncApiRepository extends Mock implements ISyncApiRepository {}
diff --git a/mobile/test/service.mocks.dart b/mobile/test/service.mocks.dart
index e1b8df40a3..87a8c01cf0 100644
--- a/mobile/test/service.mocks.dart
+++ b/mobile/test/service.mocks.dart
@@ -29,4 +29,3 @@ class MockSearchApi extends Mock implements SearchApi {}
 class MockAppSettingService extends Mock implements AppSettingsService {}
 
 class MockBackgroundService extends Mock implements BackgroundService {}
-
diff --git a/mobile/test/services/auth.service_test.dart b/mobile/test/services/auth.service_test.dart
index e4f011d940..4ada98a6c9 100644
--- a/mobile/test/services/auth.service_test.dart
+++ b/mobile/test/services/auth.service_test.dart
@@ -8,6 +8,7 @@ import 'package:isar/isar.dart';
 import 'package:mocktail/mocktail.dart';
 import 'package:openapi/api.dart';
 
+import '../domain/service.mock.dart';
 import '../repository.mocks.dart';
 import '../service.mocks.dart';
 import '../test_utils.dart';
@@ -18,6 +19,7 @@ void main() {
   late MockAuthRepository authRepository;
   late MockApiService apiService;
   late MockNetworkService networkService;
+  late MockBackgroundSyncManager backgroundSyncManager;
   late Isar db;
 
   setUp(() async {
@@ -25,12 +27,14 @@ void main() {
     authRepository = MockAuthRepository();
     apiService = MockApiService();
     networkService = MockNetworkService();
+    backgroundSyncManager = MockBackgroundSyncManager();
 
     sut = AuthService(
       authApiRepository,
       authRepository,
       apiService,
       networkService,
+      backgroundSyncManager,
     );
 
     registerFallbackValue(Uri());
@@ -116,24 +120,28 @@ void main() {
   group('logout', () {
     test('Should logout user', () async {
       when(() => authApiRepository.logout()).thenAnswer((_) async => {});
+      when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
       when(() => authRepository.clearLocalData())
           .thenAnswer((_) => Future.value(null));
 
       await sut.logout();
 
       verify(() => authApiRepository.logout()).called(1);
+      verify(() => backgroundSyncManager.cancel()).called(1);
       verify(() => authRepository.clearLocalData()).called(1);
     });
 
     test('Should clear local data even on server error', () async {
       when(() => authApiRepository.logout())
           .thenThrow(Exception('Server error'));
+      when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
       when(() => authRepository.clearLocalData())
           .thenAnswer((_) => Future.value(null));
 
       await sut.logout();
 
       verify(() => authApiRepository.logout()).called(1);
+      verify(() => backgroundSyncManager.cancel()).called(1);
       verify(() => authRepository.clearLocalData()).called(1);
     });
   });