|
|
@@ -28,7 +28,7 @@ protocol NightscoutManager: GlucoseSource {
|
|
|
final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
@Injected() private var keychain: Keychain!
|
|
|
@Injected() private var determinationStorage: DeterminationStorage!
|
|
|
- @Injected() private var glucoseStorage: GlucoseStorage!
|
|
|
+ @Injected() var glucoseStorage: GlucoseStorage!
|
|
|
@Injected() private var tempTargetsStorage: TempTargetsStorage!
|
|
|
@Injected() private var overridesStorage: OverrideStorage!
|
|
|
@Injected() private var carbsStorage: CarbsStorage!
|
|
|
@@ -38,17 +38,69 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
@Injected() private var broadcaster: Broadcaster!
|
|
|
@Injected() private var reachabilityManager: ReachabilityManager!
|
|
|
@Injected() var healthkitManager: HealthKitManager!
|
|
|
+ @Injected() private var bolusCalculationManager: BolusCalculationManager!
|
|
|
+ @Injected() private var apsManager: APSManager!
|
|
|
|
|
|
- private let orefDeterminationSubject = PassthroughSubject<Void, Never>()
|
|
|
- private let uploadOverridesSubject = PassthroughSubject<Void, Never>()
|
|
|
- private let uploadPumpHistorySubject = PassthroughSubject<Void, Never>()
|
|
|
- private let uploadCarbsSubject = PassthroughSubject<Void, Never>()
|
|
|
private let processQueue = DispatchQueue(label: "BaseNetworkManager.processQueue")
|
|
|
private var ping: TimeInterval?
|
|
|
|
|
|
- private var backgroundContext = CoreDataStack.shared.newTaskContext()
|
|
|
+ // Queue where upload pipelines run.
|
|
|
+ let uploadPipelineQueue = DispatchQueue(label: "NightscoutManager.uploadPipelines", qos: .utility)
|
|
|
+
|
|
|
+ // Background Core Data context for fetches used by upload tasks.
|
|
|
+ var backgroundContext = CoreDataStack.shared.newTaskContext()
|
|
|
+
|
|
|
+ /// Throttle window (seconds) per upload pipeline. Any requests inside this window
|
|
|
+ /// coalesce into a single upload run for that pipeline.
|
|
|
+ let uploadPipelineInterval: [NightscoutUploadPipeline: TimeInterval] = [
|
|
|
+ .carbs: 2, .pumpHistory: 2, .overrides: 2, .tempTargets: 2,
|
|
|
+ .glucose: 2, .manualGlucose: 2, .deviceStatus: 2
|
|
|
+ ]
|
|
|
+
|
|
|
+ /// Subjects used to request an upload pipeline. The pipeline applies a throttle so
|
|
|
+ /// close calls don’t double-upload.
|
|
|
+ var uploadPipelineSubjects: [NightscoutUploadPipeline: PassthroughSubject<Void, Never>] = {
|
|
|
+ var d: [NightscoutUploadPipeline: PassthroughSubject<Void, Never>] = [:]
|
|
|
+ NightscoutUploadPipeline.allCases.forEach { d[$0] = PassthroughSubject<Void, Never>() }
|
|
|
+ return d
|
|
|
+ }()
|
|
|
+
|
|
|
+ /// Request an upload for a pipeline (enqueue work). Safe to call from anywhere.
|
|
|
+ func requestUpload(_ uploadPipeline: NightscoutUploadPipeline) {
|
|
|
+ uploadPipelineSubjects[uploadPipeline]?.send(())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Build the Combine pipelines for all upload pipelines: subject → throttle → upload.
|
|
|
+ /// Must be called once during init().
|
|
|
+ func setupLanePipelines() {
|
|
|
+ for pipeline in NightscoutUploadPipeline.allCases {
|
|
|
+ guard let subject = uploadPipelineSubjects[pipeline], let window = uploadPipelineInterval[pipeline] else { continue }
|
|
|
+ subject
|
|
|
+ .receive(on: uploadPipelineQueue)
|
|
|
+ .throttle(for: .seconds(window), scheduler: uploadPipelineQueue, latest: false)
|
|
|
+ .sink { [weak self] in
|
|
|
+ guard let self else { return }
|
|
|
+ Task(priority: .utility) { await self.runUploadPipeline(pipeline) }
|
|
|
+ }
|
|
|
+ .store(in: &subscriptions)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- private var lifetime = Lifetime()
|
|
|
+ /// Runs the actual upload for a single upload pipeline.
|
|
|
+ /// Called by the throttled pipeline, not directly by callers.
|
|
|
+ func runUploadPipeline(_ uploadPipeline: NightscoutUploadPipeline) async {
|
|
|
+ switch uploadPipeline {
|
|
|
+ case .carbs: await uploadCarbs()
|
|
|
+ case .pumpHistory: await uploadPumpHistory()
|
|
|
+ case .overrides: await uploadOverrides()
|
|
|
+ case .tempTargets: await uploadTempTargets()
|
|
|
+ case .glucose: await uploadGlucose()
|
|
|
+ case .manualGlucose: await uploadManualGlucose()
|
|
|
+ case .deviceStatus:
|
|
|
+ do { try await uploadDeviceStatus() }
|
|
|
+ catch { debug(.nightscout, "deviceStatus upload failed: \(error)") }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private var isNetworkReachable: Bool {
|
|
|
reachabilityManager.isReachable
|
|
|
@@ -80,11 +132,14 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
private var lastSuggestedDetermination: Determination?
|
|
|
|
|
|
// Queue for handling Core Data change notifications
|
|
|
- private let queue = DispatchQueue(label: "BaseNightscoutManager.queue", qos: .background)
|
|
|
- private var coreDataPublisher: AnyPublisher<Set<NSManagedObjectID>, Never>?
|
|
|
- private var subscriptions = Set<AnyCancellable>()
|
|
|
+ let queue = DispatchQueue(label: "BaseNightscoutManager.queue", qos: .utility)
|
|
|
+
|
|
|
+ /// Emits changed Core Data object IDs from the app. We filter by entity names
|
|
|
+ /// and request upload pipelines based on what changed.
|
|
|
+ var coreDataPublisher: AnyPublisher<Set<NSManagedObjectID>, Never>?
|
|
|
|
|
|
- private let debouncedQueue = DispatchQueue(label: "OrefDeterminationDebounce", qos: .utility)
|
|
|
+ /// Bag for Combine subscriptions owned by this manager.
|
|
|
+ var subscriptions = Set<AnyCancellable>()
|
|
|
|
|
|
init(resolver: Resolver) {
|
|
|
injectServices(resolver)
|
|
|
@@ -96,10 +151,11 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
.share()
|
|
|
.eraseToAnyPublisher()
|
|
|
|
|
|
- registerSubscribers()
|
|
|
- registerHandlers()
|
|
|
setupNotification()
|
|
|
|
|
|
+ setupLanePipelines()
|
|
|
+ wireSubscribers()
|
|
|
+
|
|
|
/// Ensure that Nightscout Manager holds the `lastEnactedDetermination`, if one exists, on initialization.
|
|
|
/// We have to set this here in `init()`, so there's a `lastEnactedDetermination` available after an app restart
|
|
|
/// for `uploadDeviceStatus()`, as within that fuction `lastEnactedDetermination` is reassigned at the very end of the function.
|
|
|
@@ -127,157 +183,6 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func registerHandlers() {
|
|
|
- /// We add debouncing behavior here for two main reasons
|
|
|
- /// 1. To ensure that any upload flag updates have properly been performed, and in subsequent fetching processes only truly unuploaded data is fetched
|
|
|
- /// 2. To not spam the user's NS site with a high number of uploads in a very short amount of time (less than 1sec)
|
|
|
- coreDataPublisher?
|
|
|
- .filteredByEntityName("OrefDetermination")
|
|
|
- .debounce(for: .seconds(2), scheduler: debouncedQueue)
|
|
|
- .sink { [weak self] objectIDs in
|
|
|
- guard let self = self else { return }
|
|
|
-
|
|
|
- // Now hop onto the background context's queue
|
|
|
- self.backgroundContext.perform {
|
|
|
- do {
|
|
|
- // Fetch only those determination objects
|
|
|
- let request: NSFetchRequest<OrefDetermination> = OrefDetermination.fetchRequest()
|
|
|
- request.predicate = NSPredicate(
|
|
|
- format: "SELF IN %@ AND isUploadedToNS == NO",
|
|
|
- objectIDs
|
|
|
- )
|
|
|
- let results = try self.backgroundContext.fetch(request)
|
|
|
-
|
|
|
- // If valid, proceed to send to subject for further processing
|
|
|
- if !results.isEmpty {
|
|
|
- Task {
|
|
|
- do {
|
|
|
- try await self.uploadDeviceStatus()
|
|
|
- } catch {
|
|
|
- debug(.nightscout, "\(DebuggingIdentifiers.failed) failed to upload device status")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } catch {
|
|
|
- debug(.nightscout, "\(DebuggingIdentifiers.failed) Failed to fetch OrefDetermination objects: \(error)")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- .store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("OverrideStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task.detached {
|
|
|
- await self.uploadOverrides()
|
|
|
- }
|
|
|
- }.store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("OverrideRunStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task.detached {
|
|
|
- await self.uploadOverrides()
|
|
|
- }
|
|
|
- }.store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("TempTargetStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task.detached {
|
|
|
- await self.uploadTempTargets()
|
|
|
- }
|
|
|
- }.store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("TempTargetRunStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task.detached {
|
|
|
- await self.uploadTempTargets()
|
|
|
- }
|
|
|
- }.store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("PumpEventStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] objectIDs in
|
|
|
- guard let self = self else { return }
|
|
|
-
|
|
|
- self.backgroundContext.perform {
|
|
|
- do {
|
|
|
- let request: NSFetchRequest<PumpEventStored> = PumpEventStored.fetchRequest()
|
|
|
- request.predicate = NSPredicate(
|
|
|
- format: "SELF IN %@ AND isUploadedToNS == NO",
|
|
|
- objectIDs
|
|
|
- )
|
|
|
- let results = try self.backgroundContext.fetch(request)
|
|
|
-
|
|
|
- if !results.isEmpty {
|
|
|
- Task.detached {
|
|
|
- await self.uploadPumpHistory()
|
|
|
- }
|
|
|
- }
|
|
|
- } catch {
|
|
|
- debugPrint("Failed to fetch PumpEventStored objects: \(error)")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- .store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("CarbEntryStored")
|
|
|
- .debounce(for: .seconds(2), scheduler: DispatchQueue.global(qos: .background))
|
|
|
- .sink { [weak self] objectIDs in
|
|
|
- guard let self = self else { return }
|
|
|
-
|
|
|
- // Now hop onto the background context’s queue
|
|
|
- self.backgroundContext.perform {
|
|
|
- do {
|
|
|
- let request: NSFetchRequest<CarbEntryStored> = CarbEntryStored.fetchRequest()
|
|
|
- request.predicate = NSPredicate(
|
|
|
- format: "SELF IN %@ AND isUploadedToNS == NO",
|
|
|
- objectIDs
|
|
|
- )
|
|
|
- let results = try self.backgroundContext.fetch(request)
|
|
|
-
|
|
|
- // If valid, proceed to send to subject for further processing
|
|
|
- if !results.isEmpty {
|
|
|
- Task.detached {
|
|
|
- await self.uploadCarbs()
|
|
|
- }
|
|
|
- }
|
|
|
- } catch {
|
|
|
- debugPrint("Failed to fetch CarbEntryStored objects: \(error)")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- .store(in: &subscriptions)
|
|
|
-
|
|
|
- coreDataPublisher?.filteredByEntityName("GlucoseStored")
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task.detached {
|
|
|
- await self.uploadGlucose()
|
|
|
- await self.uploadManualGlucose()
|
|
|
- }
|
|
|
- }
|
|
|
- .store(in: &subscriptions)
|
|
|
- }
|
|
|
-
|
|
|
- func registerSubscribers() {
|
|
|
- glucoseStorage.updatePublisher
|
|
|
- .receive(on: queue)
|
|
|
- .sink { [weak self] _ in
|
|
|
- guard let self = self else { return }
|
|
|
- Task {
|
|
|
- await self.uploadGlucose()
|
|
|
- }
|
|
|
- }
|
|
|
- .store(in: &subscriptions)
|
|
|
- }
|
|
|
-
|
|
|
func setupNotification() {
|
|
|
Foundation.NotificationCenter.default.publisher(for: .willUpdateOverrideConfiguration)
|
|
|
.sink { [weak self] _ in
|
|
|
@@ -588,6 +493,29 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
fetchedEnactedDetermination = enacted
|
|
|
}
|
|
|
|
|
|
+ // Calculate recommended bolus
|
|
|
+ var recommendedBolus: Decimal = 0
|
|
|
+
|
|
|
+ if let latest = fetchedSuggestedDetermination ?? fetchedEnactedDetermination {
|
|
|
+ let minPredBG = latest.minPredBGFromReason ?? 0
|
|
|
+ let simulatedCOB: Int16? = latest.cob.map { Int16(truncating: NSDecimalNumber(decimal: $0)) }
|
|
|
+
|
|
|
+ let result = await bolusCalculationManager.handleBolusCalculation(
|
|
|
+ carbs: 0,
|
|
|
+ useFattyMealCorrection: false,
|
|
|
+ useSuperBolus: false,
|
|
|
+ lastLoopDate: apsManager.lastLoopDate,
|
|
|
+ minPredBG: minPredBG,
|
|
|
+ simulatedCOB: simulatedCOB,
|
|
|
+ isBackdated: false
|
|
|
+ )
|
|
|
+
|
|
|
+ recommendedBolus = apsManager.roundBolus(amount: result.insulinCalculated)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Bolus increment
|
|
|
+ let bolusIncrement = settingsManager.preferences.bolusIncrement
|
|
|
+
|
|
|
// Gather all relevant data for OpenAPS Status
|
|
|
let iob = await fetchedIOBEntry
|
|
|
|
|
|
@@ -598,7 +526,8 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
iob: iob?.first,
|
|
|
suggested: suggestedToUpload,
|
|
|
enacted: settingsManager.settings.closedLoop ? enactedToUpload : nil,
|
|
|
- version: Bundle.main.releaseVersionNumber ?? "Unknown"
|
|
|
+ version: Bundle.main.releaseVersionNumber ?? "Unknown",
|
|
|
+ recommendedBolus: recommendedBolus
|
|
|
)
|
|
|
|
|
|
debug(.nightscout, "To be uploaded openapsStatus: \(openapsStatus)")
|
|
|
@@ -611,7 +540,8 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
clock: Date(),
|
|
|
battery: battery,
|
|
|
reservoir: reservoir != 0xDEAD_BEEF ? reservoir : nil,
|
|
|
- status: pumpStatus
|
|
|
+ status: pumpStatus,
|
|
|
+ bolusIncrement: bolusIncrement
|
|
|
)
|
|
|
|
|
|
let batteryLevel = await UIDevice.current.batteryLevel
|