Просмотр исходного кода

Nightscout: extract subscribers to extension, add lane-based kick (2s throttle) + APS triggers uploads

Jonas Björkert 7 месяцев назад
Родитель
Сommit
4c5f0e1d37

+ 8 - 0
Trio.xcodeproj/project.pbxproj

@@ -607,6 +607,8 @@
 		DD82D4B82DCAB2BA00BAFC77 /* PropertyPersistentFlags.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD82D4B72DCAB2BA00BAFC77 /* PropertyPersistentFlags.swift */; };
 		DD868FD82E381A54005D3308 /* APNSJWTClaims.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD868FD72E381A54005D3308 /* APNSJWTClaims.swift */; };
 		DD88C8E22C50420800F2D558 /* DefinitionRow.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD88C8E12C50420800F2D558 /* DefinitionRow.swift */; };
+		DD906BF42EA6AA0100262772 /* NightscoutLane.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD906BF32EA6AA0100262772 /* NightscoutLane.swift */; };
+		DD906BF62EA6AAE900262772 /* BaseNightscoutManager+Subscribers.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD906BF52EA6AAE900262772 /* BaseNightscoutManager+Subscribers.swift */; };
 		DD940BAA2CA7585D000830A5 /* GlucoseColorScheme.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD940BA92CA7585D000830A5 /* GlucoseColorScheme.swift */; };
 		DD940BAC2CA75889000830A5 /* DynamicGlucoseColor.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD940BAB2CA75889000830A5 /* DynamicGlucoseColor.swift */; };
 		DD98ACC02D71013200C0778F /* StatChartUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = DD98ACBF2D71013200C0778F /* StatChartUtils.swift */; };
@@ -1434,6 +1436,8 @@
 		DD82D4B72DCAB2BA00BAFC77 /* PropertyPersistentFlags.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PropertyPersistentFlags.swift; sourceTree = "<group>"; };
 		DD868FD72E381A54005D3308 /* APNSJWTClaims.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = APNSJWTClaims.swift; sourceTree = "<group>"; };
 		DD88C8E12C50420800F2D558 /* DefinitionRow.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DefinitionRow.swift; sourceTree = "<group>"; };
+		DD906BF32EA6AA0100262772 /* NightscoutLane.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NightscoutLane.swift; sourceTree = "<group>"; };
+		DD906BF52EA6AAE900262772 /* BaseNightscoutManager+Subscribers.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "BaseNightscoutManager+Subscribers.swift"; sourceTree = "<group>"; };
 		DD940BA92CA7585D000830A5 /* GlucoseColorScheme.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GlucoseColorScheme.swift; sourceTree = "<group>"; };
 		DD940BAB2CA75889000830A5 /* DynamicGlucoseColor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DynamicGlucoseColor.swift; sourceTree = "<group>"; };
 		DD98ACBF2D71013200C0778F /* StatChartUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StatChartUtils.swift; sourceTree = "<group>"; };
@@ -3472,8 +3476,10 @@
 		DDC9B9962CFD2332003E7721 /* Nightscout */ = {
 			isa = PBXGroup;
 			children = (
+				DD906BF32EA6AA0100262772 /* NightscoutLane.swift */,
 				3811DE9725C9D88300A708ED /* NightscoutManager.swift */,
 				38FE826C25CC8461001FF17A /* NightscoutAPI.swift */,
+				DD906BF52EA6AAE900262772 /* BaseNightscoutManager+Subscribers.swift */,
 			);
 			path = Nightscout;
 			sourceTree = "<group>";
@@ -4129,6 +4135,7 @@
 				F90692CF274B999A0037068D /* HealthKitDataFlow.swift in Sources */,
 				CE7CA3552A064973004BE681 /* ListStateIntent.swift in Sources */,
 				C28DD7262DBA9A9E00EC02DD /* GlucosePercentileDetailView.swift in Sources */,
+				DD906BF42EA6AA0100262772 /* NightscoutLane.swift in Sources */,
 				BDF530D82B40F8AC002CAF43 /* LockScreenView.swift in Sources */,
 				195D80B72AF697B800D25097 /* DynamicSettingsDataFlow.swift in Sources */,
 				DD98ACC02D71013200C0778F /* StatChartUtils.swift in Sources */,
@@ -4436,6 +4443,7 @@
 				BD7DA9AC2AE06EB900601B20 /* BolusCalculatorConfigRootView.swift in Sources */,
 				AD3D2CD42CD01B9EB8F26522 /* PumpConfigDataFlow.swift in Sources */,
 				DD17452E2C55AE4800211FAC /* TargetBehavoirDataFlow.swift in Sources */,
+				DD906BF62EA6AAE900262772 /* BaseNightscoutManager+Subscribers.swift in Sources */,
 				53F2382465BF74DB1A967C8B /* PumpConfigProvider.swift in Sources */,
 				5D16287A969E64D18CE40E44 /* PumpConfigStateModel.swift in Sources */,
 				DDF6902C2DA028D3008BF16C /* DiagnosticsStepView.swift in Sources */,

+ 5 - 0
Trio/Sources/APS/APSManager.swift

@@ -217,6 +217,11 @@ final class BaseAPSManager: APSManager, Injectable {
             do {
                 // Execute loop logic
                 try await self.executeLoop(loopStatRecord: &loopStatRecord)
+
+                requestNightscoutUpload(
+                    [.carbs, .pumpHistory, .overrides, .tempTargets],
+                    source: "APSManager"
+                )
             } catch {
                 var updatedStats = loopStatRecord
                 updatedStats.end = Date()

+ 83 - 0
Trio/Sources/Services/Network/Nightscout/BaseNightscoutManager+Subscribers.swift

@@ -0,0 +1,83 @@
+import Combine
+import CoreData
+import Foundation
+
+extension BaseNightscoutManager {
+    func wireSubscribers() {
+        wireExternalUploadRequests()
+        wireCoreDataSubscribers()
+        wireGlucoseStorageSubscriber()
+    }
+
+    func wireExternalUploadRequests() {
+        Foundation.NotificationCenter.default.publisher(for: .nightscoutUploadRequested)
+            .sink { [weak self] note in
+                guard let self else { return }
+                let lanes = (note.userInfo?[NightscoutNotificationKey.lanes] as? [String])?
+                    .compactMap(NightscoutLane.init(rawValue:)) ?? []
+
+                for lane in lanes { self.kick(lane) }
+
+                var info: [AnyHashable: Any] = [NightscoutNotificationKey.lanes: lanes.map(\.rawValue)]
+                if let src = note.userInfo?[NightscoutNotificationKey.source] { info[NightscoutNotificationKey.source] = src }
+                Foundation.NotificationCenter.default.post(name: .nightscoutUploadDidFinish, object: nil, userInfo: info)
+            }
+            .store(in: &subscriptions)
+    }
+
+    func wireCoreDataSubscribers() {
+        coreDataPublisher?
+            .filteredByEntityName("OrefDetermination")
+            .sink { [weak self] _ in self?.kick(.deviceStatus) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("OverrideStored")
+            .sink { [weak self] _ in self?.kick(.overrides) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("OverrideRunStored")
+            .sink { [weak self] _ in self?.kick(.overrides) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("TempTargetStored")
+            .sink { [weak self] _ in self?.kick(.tempTargets) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("TempTargetRunStored")
+            .sink { [weak self] _ in self?.kick(.tempTargets) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("PumpEventStored")
+            .sink { [weak self] _ in self?.kick(.pumpHistory) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("CarbEntryStored")
+            .sink { [weak self] _ in self?.kick(.carbs) }
+            .store(in: &subscriptions)
+
+        coreDataPublisher?
+            .filteredByEntityName("GlucoseStored")
+            .sink { [weak self] _ in
+                self?.kick(.glucose)
+                self?.kick(.manualGlucose)
+            }
+            .store(in: &subscriptions)
+    }
+
+    // MARK: 3) Glucose storage tick → kick glucose lane
+
+    func wireGlucoseStorageSubscriber() {
+        glucoseStorage.updatePublisher
+            .receive(on: queue)
+            .sink { [weak self] _ in
+                self?.kick(.glucose)
+            }
+            .store(in: &subscriptions)
+    }
+}

+ 28 - 0
Trio/Sources/Services/Network/Nightscout/NightscoutLane.swift

@@ -0,0 +1,28 @@
+import Foundation
+
+public enum NightscoutLane: String, CaseIterable {
+    case carbs
+    case pumpHistory
+    case overrides
+    case tempTargets
+    case glucose
+    case manualGlucose
+    case deviceStatus
+}
+
+public enum NightscoutNotificationKey {
+    public static let lanes = "lanes"
+    public static let source = "source"
+}
+
+public extension Foundation.Notification.Name {
+    static let nightscoutUploadRequested = Notification.Name("nightscoutUploadRequested")
+    static let nightscoutUploadDidFinish = Notification.Name("nightscoutUploadDidFinish")
+}
+
+/// Simple helper any component (e.g. APSManager) can call.
+public func requestNightscoutUpload(_ lanes: [NightscoutLane], source: String? = nil) {
+    var userInfo: [AnyHashable: Any] = [NightscoutNotificationKey.lanes: lanes.map(\.rawValue)]
+    if let source { userInfo[NightscoutNotificationKey.source] = source }
+    Foundation.NotificationCenter.default.post(name: .nightscoutUploadRequested, object: nil, userInfo: userInfo)
+}

+ 52 - 165
Trio/Sources/Services/Network/Nightscout/NightscoutManager.swift

@@ -28,7 +28,7 @@ protocol NightscoutManager: GlucoseSource {
 final class BaseNightscoutManager: NightscoutManager, Injectable {
     @Injected() private var keychain: Keychain!
     @Injected() private var determinationStorage: DeterminationStorage!
-    @Injected() private var glucoseStorage: GlucoseStorage!
+    @Injected() var glucoseStorage: GlucoseStorage!
     @Injected() private var tempTargetsStorage: TempTargetsStorage!
     @Injected() private var overridesStorage: OverrideStorage!
     @Injected() private var carbsStorage: CarbsStorage!
@@ -41,16 +41,55 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
     @Injected() private var bolusCalculationManager: BolusCalculationManager!
     @Injected() private var apsManager: APSManager!
 
-    private let orefDeterminationSubject = PassthroughSubject<Void, Never>()
-    private let uploadOverridesSubject = PassthroughSubject<Void, Never>()
-    private let uploadPumpHistorySubject = PassthroughSubject<Void, Never>()
-    private let uploadCarbsSubject = PassthroughSubject<Void, Never>()
     private let processQueue = DispatchQueue(label: "BaseNetworkManager.processQueue")
     private var ping: TimeInterval?
 
-    private var backgroundContext = CoreDataStack.shared.newTaskContext()
+    let laneQueue = DispatchQueue(label: "NightscoutManager.lanes", qos: .utility)
 
-    private var lifetime = Lifetime()
+    var backgroundContext = CoreDataStack.shared.newTaskContext()
+
+    let laneInterval: [NightscoutLane: TimeInterval] = [
+        .carbs: 2, .pumpHistory: 2, .overrides: 2, .tempTargets: 2,
+        .glucose: 2, .manualGlucose: 2, .deviceStatus: 2
+    ]
+
+    var laneSubjects: [NightscoutLane: PassthroughSubject<Void, Never>] = {
+        var d: [NightscoutLane: PassthroughSubject<Void, Never>] = [:]
+        NightscoutLane.allCases.forEach { d[$0] = PassthroughSubject<Void, Never>() }
+        return d
+    }()
+
+    func kick(_ lane: NightscoutLane) {
+        laneSubjects[lane]?.send(())
+    }
+
+    func setupLanePipelines() {
+        for lane in NightscoutLane.allCases {
+            guard let subject = laneSubjects[lane], let window = laneInterval[lane] else { continue }
+            subject
+                .receive(on: laneQueue)
+                .throttle(for: .seconds(window), scheduler: laneQueue, latest: false)
+                .sink { [weak self] in
+                    guard let self else { return }
+                    Task(priority: .utility) { await self.runLane(lane) }
+                }
+                .store(in: &subscriptions)
+        }
+    }
+
+    func runLane(_ lane: NightscoutLane) async {
+        switch lane {
+        case .carbs: await uploadCarbs()
+        case .pumpHistory: await uploadPumpHistory()
+        case .overrides: await uploadOverrides()
+        case .tempTargets: await uploadTempTargets()
+        case .glucose: await uploadGlucose()
+        case .manualGlucose: await uploadManualGlucose()
+        case .deviceStatus:
+            do { try await uploadDeviceStatus() }
+            catch { debug(.nightscout, "deviceStatus upload failed: \(error)") }
+        }
+    }
 
     private var isNetworkReachable: Bool {
         reachabilityManager.isReachable
@@ -82,11 +121,9 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
     private var lastSuggestedDetermination: Determination?
 
     // Queue for handling Core Data change notifications
-    private let queue = DispatchQueue(label: "BaseNightscoutManager.queue", qos: .background)
-    private var coreDataPublisher: AnyPublisher<Set<NSManagedObjectID>, Never>?
-    private var subscriptions = Set<AnyCancellable>()
-
-    private let debouncedQueue = DispatchQueue(label: "OrefDeterminationDebounce", qos: .utility)
+    let queue = DispatchQueue(label: "BaseNightscoutManager.queue", qos: .utility)
+    var coreDataPublisher: AnyPublisher<Set<NSManagedObjectID>, Never>?
+    var subscriptions = Set<AnyCancellable>()
 
     init(resolver: Resolver) {
         injectServices(resolver)
@@ -98,10 +135,11 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
                 .share()
                 .eraseToAnyPublisher()
 
-        registerSubscribers()
-        registerHandlers()
         setupNotification()
 
+        setupLanePipelines()
+        wireSubscribers()
+
         /// Ensure that Nightscout Manager holds the `lastEnactedDetermination`, if one exists, on initialization.
         /// We have to set this here in `init()`, so there's a `lastEnactedDetermination` available after an app restart
         /// for `uploadDeviceStatus()`, as within that fuction `lastEnactedDetermination` is reassigned at the very end of the function.
@@ -129,157 +167,6 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
         }
     }
 
-    private func registerHandlers() {
-        /// We add debouncing behavior here for two main reasons
-        /// 1. To ensure that any upload flag updates have properly been performed, and in subsequent fetching processes only truly unuploaded data is fetched
-        /// 2. To not spam the user's NS site with a high number of uploads in a very short amount of time (less than 1sec)
-        coreDataPublisher?
-            .filteredByEntityName("OrefDetermination")
-            .debounce(for: .seconds(2), scheduler: debouncedQueue)
-            .sink { [weak self] objectIDs in
-                guard let self = self else { return }
-
-                // Now hop onto the background context's queue
-                self.backgroundContext.perform {
-                    do {
-                        // Fetch only those determination objects
-                        let request: NSFetchRequest<OrefDetermination> = OrefDetermination.fetchRequest()
-                        request.predicate = NSPredicate(
-                            format: "SELF IN %@ AND isUploadedToNS == NO",
-                            objectIDs
-                        )
-                        let results = try self.backgroundContext.fetch(request)
-
-                        // If valid, proceed to send to subject for further processing
-                        if !results.isEmpty {
-                            Task {
-                                do {
-                                    try await self.uploadDeviceStatus()
-                                } catch {
-                                    debug(.nightscout, "\(DebuggingIdentifiers.failed) failed to upload device status")
-                                }
-                            }
-                        }
-                    } catch {
-                        debug(.nightscout, "\(DebuggingIdentifiers.failed) Failed to fetch OrefDetermination objects: \(error)")
-                    }
-                }
-            }
-            .store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("OverrideStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task.detached {
-                    await self.uploadOverrides()
-                }
-            }.store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("OverrideRunStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task.detached {
-                    await self.uploadOverrides()
-                }
-            }.store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("TempTargetStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task.detached {
-                    await self.uploadTempTargets()
-                }
-            }.store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("TempTargetRunStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task.detached {
-                    await self.uploadTempTargets()
-                }
-            }.store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("PumpEventStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] objectIDs in
-                guard let self = self else { return }
-
-                self.backgroundContext.perform {
-                    do {
-                        let request: NSFetchRequest<PumpEventStored> = PumpEventStored.fetchRequest()
-                        request.predicate = NSPredicate(
-                            format: "SELF IN %@ AND isUploadedToNS == NO",
-                            objectIDs
-                        )
-                        let results = try self.backgroundContext.fetch(request)
-
-                        if !results.isEmpty {
-                            Task.detached {
-                                await self.uploadPumpHistory()
-                            }
-                        }
-                    } catch {
-                        debugPrint("Failed to fetch PumpEventStored objects: \(error)")
-                    }
-                }
-            }
-            .store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("CarbEntryStored")
-            .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
-            .sink { [weak self] objectIDs in
-                guard let self = self else { return }
-
-                // Now hop onto the background context’s queue
-                self.backgroundContext.perform {
-                    do {
-                        let request: NSFetchRequest<CarbEntryStored> = CarbEntryStored.fetchRequest()
-                        request.predicate = NSPredicate(
-                            format: "SELF IN %@ AND isUploadedToNS == NO",
-                            objectIDs
-                        )
-                        let results = try self.backgroundContext.fetch(request)
-
-                        // If valid, proceed to send to subject for further processing
-                        if !results.isEmpty {
-                            Task.detached {
-                                await self.uploadCarbs()
-                            }
-                        }
-                    } catch {
-                        debugPrint("Failed to fetch CarbEntryStored objects: \(error)")
-                    }
-                }
-            }
-            .store(in: &subscriptions)
-
-        coreDataPublisher?.filteredByEntityName("GlucoseStored")
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task.detached {
-                    await self.uploadGlucose()
-                    await self.uploadManualGlucose()
-                }
-            }
-            .store(in: &subscriptions)
-    }
-
-    func registerSubscribers() {
-        glucoseStorage.updatePublisher
-            .receive(on: queue)
-            .sink { [weak self] _ in
-                guard let self = self else { return }
-                Task {
-                    await self.uploadGlucose()
-                }
-            }
-            .store(in: &subscriptions)
-    }
-
     func setupNotification() {
         Foundation.NotificationCenter.default.publisher(for: .willUpdateOverrideConfiguration)
             .sink { [weak self] _ in