|
|
@@ -70,6 +70,52 @@ enum APSError: LocalizedError {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// MARK: - Thread-safe loop serialization
|
|
|
+
|
|
|
+/// Ensures only one loop runs at a time via actor isolation.
|
|
|
+/// Replaces the non-atomic check-then-act pattern on `isLooping` CurrentValueSubject.
|
|
|
+private actor LoopGuard {
|
|
|
+ private var isRunning = false
|
|
|
+
|
|
|
+ /// Atomically checks whether a new loop can start and marks it as running if so.
|
|
|
+ func tryStart(minInterval: TimeInterval, lastLoopDate: Date, lastLoopStartDate: Date) -> Bool {
|
|
|
+ // If the last loop completed after it started, enforce minimum interval
|
|
|
+ if lastLoopDate > lastLoopStartDate {
|
|
|
+ guard lastLoopStartDate.addingTimeInterval(minInterval) < Date() else { return false }
|
|
|
+ }
|
|
|
+ guard !isRunning else { return false }
|
|
|
+ isRunning = true
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ func finish() {
|
|
|
+ isRunning = false
|
|
|
+ }
|
|
|
+
|
|
|
+ var running: Bool { isRunning }
|
|
|
+}
|
|
|
+
|
|
|
+// MARK: - Thread-safe bolus progress state
|
|
|
+
|
|
|
+/// Isolates the mutable `DoseProgressReporter` reference to prevent
|
|
|
+/// data races between Combine sinks (processQueue) and async contexts (cancelBolus).
|
|
|
+private actor BolusProgressState {
|
|
|
+ private var reporter: DoseProgressReporter?
|
|
|
+ private weak var observer: (any DoseProgressObserver)?
|
|
|
+
|
|
|
+ func setReporter(_ newReporter: DoseProgressReporter?, observer: any DoseProgressObserver) {
|
|
|
+ reporter?.removeObserver(observer)
|
|
|
+ reporter = newReporter
|
|
|
+ reporter?.addObserver(observer)
|
|
|
+ self.observer = observer
|
|
|
+ }
|
|
|
+
|
|
|
+ func clear() {
|
|
|
+ if let observer { reporter?.removeObserver(observer) }
|
|
|
+ reporter = nil
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
final class BaseAPSManager: APSManager, Injectable {
|
|
|
private let processQueue = DispatchQueue(label: "BaseAPSManager.processQueue")
|
|
|
@Injected() private var storage: FileStorage!
|
|
|
@@ -96,7 +142,8 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
|
|
|
private var lifetime = Lifetime()
|
|
|
|
|
|
- private var backgroundTaskID: UIBackgroundTaskIdentifier?
|
|
|
+ private let loopGuard = LoopGuard()
|
|
|
+ private let bolusProgressState = BolusProgressState()
|
|
|
|
|
|
var pumpManager: PumpManagerUI? {
|
|
|
get { deviceDataManager.pumpManager }
|
|
|
@@ -168,7 +215,6 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
}
|
|
|
|
|
|
deviceDataManager.recommendsLoop
|
|
|
- .receive(on: processQueue)
|
|
|
.sink { [weak self] in
|
|
|
self?.loop()
|
|
|
}
|
|
|
@@ -178,46 +224,45 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
deviceDataManager.errorSubject
|
|
|
.receive(on: processQueue)
|
|
|
.map { APSError.pumpError($0) }
|
|
|
- .sink {
|
|
|
- self.processError($0)
|
|
|
+ .sink { [weak self] in
|
|
|
+ self?.processError($0)
|
|
|
}
|
|
|
.store(in: &lifetime)
|
|
|
|
|
|
deviceDataManager.bolusTrigger
|
|
|
.receive(on: processQueue)
|
|
|
- .sink { bolusing in
|
|
|
+ .sink { [weak self] bolusing in
|
|
|
if bolusing {
|
|
|
- self.createBolusReporter()
|
|
|
+ self?.createBolusReporter()
|
|
|
} else {
|
|
|
- self.clearBolusReporter()
|
|
|
+ self?.clearBolusReporter()
|
|
|
}
|
|
|
}
|
|
|
.store(in: &lifetime)
|
|
|
|
|
|
deviceDataManager.scheduledBasal
|
|
|
.receive(on: processQueue)
|
|
|
- .sink { scheduledBasal in
|
|
|
- self.isScheduledBasal = scheduledBasal
|
|
|
+ .sink { [weak self] scheduledBasal in
|
|
|
+ self?.isScheduledBasal = scheduledBasal
|
|
|
}
|
|
|
.store(in: &lifetime)
|
|
|
|
|
|
deviceDataManager.suspended
|
|
|
.receive(on: processQueue)
|
|
|
- .sink { suspended in
|
|
|
- self.isSuspended = suspended
|
|
|
+ .sink { [weak self] suspended in
|
|
|
+ self?.isSuspended = suspended
|
|
|
}
|
|
|
.store(in: &lifetime)
|
|
|
|
|
|
// manage a manual Temp Basal from PumpManager - force loop() after manual temp basal is cancelled or finishes
|
|
|
deviceDataManager.manualTempBasal
|
|
|
- .receive(on: processQueue)
|
|
|
- .sink { manualBasal in
|
|
|
+ .sink { [weak self] manualBasal in
|
|
|
if manualBasal {
|
|
|
- self.isManualTempBasal = true
|
|
|
+ self?.isManualTempBasal = true
|
|
|
} else {
|
|
|
- if self.isManualTempBasal {
|
|
|
- self.isManualTempBasal = false
|
|
|
- self.loop()
|
|
|
+ if self?.isManualTempBasal == true {
|
|
|
+ self?.isManualTempBasal = false
|
|
|
+ self?.loop()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -233,116 +278,80 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
Task { [weak self] in
|
|
|
guard let self else { return }
|
|
|
|
|
|
- // Check if we can start a new loop
|
|
|
- guard await self.canStartNewLoop() else { return }
|
|
|
+ // Atomic check-and-set via actor — eliminates the race between
|
|
|
+ // checking isLooping.value and sending isLooping(true)
|
|
|
+ guard await loopGuard.tryStart(
|
|
|
+ minInterval: Config.loopInterval,
|
|
|
+ lastLoopDate: lastLoopDate,
|
|
|
+ lastLoopStartDate: lastLoopStartDate
|
|
|
+ ) else {
|
|
|
+ debug(.apsManager, "Loop skipped (already running or too soon)")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- // Setup loop and background task
|
|
|
- var (loopStatRecord, backgroundTask) = await self.setupLoop()
|
|
|
+ // Local background task — no shared mutable state
|
|
|
+ let bgTask = await UIApplication.shared.beginBackgroundTask(withName: "Loop starting")
|
|
|
+ isLooping.send(true)
|
|
|
|
|
|
- do {
|
|
|
- // Execute loop logic
|
|
|
- try await self.executeLoop(loopStatRecord: &loopStatRecord)
|
|
|
+ let loopStartDate = Date()
|
|
|
+ lastLoopStartDate = loopStartDate
|
|
|
+ let interval = await calculateLoopInterval(loopStartDate: loopStartDate)
|
|
|
|
|
|
+ var loopStatRecord = LoopStats(
|
|
|
+ start: loopStartDate,
|
|
|
+ loopStatus: "Starting",
|
|
|
+ interval: interval
|
|
|
+ )
|
|
|
+
|
|
|
+ defer {
|
|
|
+ if let bgTask = Optional(bgTask), bgTask != .invalid {
|
|
|
+ Task { await UIApplication.shared.endBackgroundTask(bgTask) }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ do {
|
|
|
+ try await executeLoop(loopStatRecord: &loopStatRecord)
|
|
|
requestNightscoutUpload(
|
|
|
[.carbs, .pumpHistory, .overrides, .tempTargets],
|
|
|
source: "APSManager"
|
|
|
)
|
|
|
+ await finalizeLoop(loopStatRecord: loopStatRecord)
|
|
|
} catch {
|
|
|
- var updatedStats = loopStatRecord
|
|
|
- updatedStats.end = Date()
|
|
|
- updatedStats.duration = roundDouble((updatedStats.end! - updatedStats.start).timeInterval / 60, 2)
|
|
|
- updatedStats.loopStatus = error.localizedDescription
|
|
|
- await loopCompleted(error: error, loopStatRecord: updatedStats)
|
|
|
+ loopStatRecord.end = Date()
|
|
|
+ loopStatRecord.duration = roundDouble((loopStatRecord.end! - loopStatRecord.start).timeInterval / 60, 2)
|
|
|
+ loopStatRecord.loopStatus = error.localizedDescription
|
|
|
+ await finalizeLoop(error: error, loopStatRecord: loopStatRecord)
|
|
|
debug(.apsManager, "\(DebuggingIdentifiers.failed) Failed to complete Loop: \(error)")
|
|
|
}
|
|
|
-
|
|
|
- // Cleanup background task
|
|
|
- if let backgroundTask = backgroundTask {
|
|
|
- await UIApplication.shared.endBackgroundTask(backgroundTask)
|
|
|
- self.backgroundTaskID = .invalid
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private func canStartNewLoop() async -> Bool {
|
|
|
- // Check if too soon for next loop
|
|
|
- if lastLoopDate > lastLoopStartDate {
|
|
|
- guard lastLoopStartDate.addingTimeInterval(Config.loopInterval) < Date() else {
|
|
|
- debug(.apsManager, "Not enough time have passed since last loop at : \(lastLoopStartDate)")
|
|
|
- return false
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Check if loop already in progress
|
|
|
- guard !isLooping.value else {
|
|
|
- warning(.apsManager, "Loop already in progress. Skip recommendation.")
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- return true
|
|
|
- }
|
|
|
-
|
|
|
- private func setupLoop() async -> (LoopStats, UIBackgroundTaskIdentifier?) {
|
|
|
- // Start background task
|
|
|
- let backgroundTask = await UIApplication.shared.beginBackgroundTask(withName: "Loop starting") { [weak self] in
|
|
|
- guard let self, let backgroundTask = self.backgroundTaskID else { return }
|
|
|
- Task {
|
|
|
- UIApplication.shared.endBackgroundTask(backgroundTask)
|
|
|
- }
|
|
|
- self.backgroundTaskID = .invalid
|
|
|
- }
|
|
|
- backgroundTaskID = backgroundTask
|
|
|
-
|
|
|
- // Set loop start time
|
|
|
- lastLoopStartDate = Date()
|
|
|
-
|
|
|
- // Calculate interval from previous loop
|
|
|
- let interval = await calculateLoopInterval()
|
|
|
-
|
|
|
- // Create initial loop stats record
|
|
|
- let loopStatRecord = LoopStats(
|
|
|
- start: lastLoopStartDate,
|
|
|
- loopStatus: "Starting",
|
|
|
- interval: interval
|
|
|
- )
|
|
|
-
|
|
|
- isLooping.send(true)
|
|
|
-
|
|
|
- return (loopStatRecord, backgroundTask)
|
|
|
- }
|
|
|
-
|
|
|
private func executeLoop(loopStatRecord: inout LoopStats) async throws {
|
|
|
try await determineBasal()
|
|
|
|
|
|
- // Handle open loop
|
|
|
- guard settings.closedLoop else {
|
|
|
- loopStatRecord.end = Date()
|
|
|
- loopStatRecord.duration = roundDouble((loopStatRecord.end! - loopStatRecord.start).timeInterval / 60, 2)
|
|
|
- loopStatRecord.loopStatus = "Success"
|
|
|
- await loopCompleted(loopStatRecord: loopStatRecord)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // Handle closed loop
|
|
|
- try await enactDetermination()
|
|
|
loopStatRecord.end = Date()
|
|
|
loopStatRecord.duration = roundDouble((loopStatRecord.end! - loopStatRecord.start).timeInterval / 60, 2)
|
|
|
loopStatRecord.loopStatus = "Success"
|
|
|
- await loopCompleted(loopStatRecord: loopStatRecord)
|
|
|
+
|
|
|
+ // Closed loop: also enact the determination
|
|
|
+ if settings.closedLoop {
|
|
|
+ try await enactDetermination()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private func calculateLoopInterval() async -> Double? {
|
|
|
+ private func calculateLoopInterval(loopStartDate: Date) async -> Double? {
|
|
|
do {
|
|
|
- return try await privateContext.perform {
|
|
|
+ return try await privateContext.perform { [weak self] in
|
|
|
+ guard let self else { return nil }
|
|
|
let requestStats = LoopStatRecord.fetchRequest() as NSFetchRequest<LoopStatRecord>
|
|
|
let sortStats = NSSortDescriptor(key: "end", ascending: false)
|
|
|
requestStats.sortDescriptors = [sortStats]
|
|
|
requestStats.fetchLimit = 1
|
|
|
let previousLoop = try self.privateContext.fetch(requestStats)
|
|
|
|
|
|
- if (previousLoop.first?.end ?? .distantFuture) < self.lastLoopStartDate {
|
|
|
+ if (previousLoop.first?.end ?? .distantFuture) < loopStartDate {
|
|
|
return self.roundDouble(
|
|
|
- (self.lastLoopStartDate - (previousLoop.first?.end ?? Date())).timeInterval / 60,
|
|
|
+ (loopStartDate - (previousLoop.first?.end ?? Date())).timeInterval / 60,
|
|
|
1
|
|
|
)
|
|
|
}
|
|
|
@@ -354,16 +363,13 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Loop exit point
|
|
|
- private func loopCompleted(error: Error? = nil, loopStatRecord: LoopStats) async {
|
|
|
+ /// Single exit point for loop — replaces the old `loopCompleted()`.
|
|
|
+ private func finalizeLoop(error: Error? = nil, loopStatRecord: LoopStats) async {
|
|
|
isLooping.send(false)
|
|
|
+ await loopGuard.finish()
|
|
|
|
|
|
if let error = error {
|
|
|
warning(.apsManager, "Loop failed with error: \(error)")
|
|
|
- if let backgroundTask = backgroundTaskID {
|
|
|
- await UIApplication.shared.endBackgroundTask(backgroundTask)
|
|
|
- backgroundTaskID = .invalid
|
|
|
- }
|
|
|
processError(error)
|
|
|
} else {
|
|
|
debug(.apsManager, "Loop succeeded")
|
|
|
@@ -376,12 +382,6 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
if settings.closedLoop {
|
|
|
await reportEnacted(wasEnacted: error == nil)
|
|
|
}
|
|
|
-
|
|
|
- // End of the BG tasks
|
|
|
- if let backgroundTask = backgroundTaskID {
|
|
|
- await UIApplication.shared.endBackgroundTask(backgroundTask)
|
|
|
- backgroundTaskID = .invalid
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
private func verifyStatus() -> Error? {
|
|
|
@@ -551,8 +551,6 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
return min(rounded, maxBolus)
|
|
|
}
|
|
|
|
|
|
- private var bolusReporter: DoseProgressReporter?
|
|
|
-
|
|
|
func enactBolus(amount: Double, isSMB: Bool, callback: ((Bool, String) -> Void)?) async {
|
|
|
if amount <= 0 {
|
|
|
return
|
|
|
@@ -625,8 +623,7 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
)
|
|
|
)
|
|
|
}
|
|
|
- bolusReporter?.removeObserver(self)
|
|
|
- bolusReporter = nil
|
|
|
+ await bolusProgressState.clear()
|
|
|
bolusProgress.send(nil)
|
|
|
}
|
|
|
|
|
|
@@ -1207,15 +1204,15 @@ final class BaseAPSManager: APSManager, Injectable {
|
|
|
}
|
|
|
|
|
|
private func createBolusReporter() {
|
|
|
- bolusReporter = pumpManager?.createBolusProgressReporter(reportingOn: processQueue)
|
|
|
- bolusReporter?.addObserver(self)
|
|
|
+ let reporter = pumpManager?.createBolusProgressReporter(reportingOn: processQueue)
|
|
|
+ Task { await bolusProgressState.setReporter(reporter, observer: self) }
|
|
|
}
|
|
|
|
|
|
private func clearBolusReporter() {
|
|
|
- bolusReporter?.removeObserver(self)
|
|
|
- bolusReporter = nil
|
|
|
- processQueue.asyncAfter(deadline: .now() + 0.5) {
|
|
|
- self.bolusProgress.send(nil)
|
|
|
+ Task { [weak self] in
|
|
|
+ await self?.bolusProgressState.clear()
|
|
|
+ try? await Task.sleep(for: .milliseconds(500))
|
|
|
+ self?.bolusProgress.send(nil)
|
|
|
}
|
|
|
}
|
|
|
}
|