From b1084d74bb9753ebcee2ca2f2fb56e6424ea2868 Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:43:19 +0100 Subject: [PATCH 1/6] basic implementation of a mechanism that adds services to a running group --- Sources/ServiceLifecycle/ServiceGroup.swift | 144 +++++-- Tests/ServiceLifecycleTests/MockService.swift | 71 ++++ .../ServiceGroupAddServiceTests.swift | 397 ++++++++++++++++++ .../ServiceGroupTests.swift | 67 --- 4 files changed, 586 insertions(+), 93 deletions(-) create mode 100644 Tests/ServiceLifecycleTests/MockService.swift create mode 100644 Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 853cc6d..eb931e9 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -14,6 +14,7 @@ import Logging import UnixSignals +import AsyncAlgorithms /// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services. public actor ServiceGroup: Sendable, Service { @@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service { case initial(services: [ServiceGroupConfiguration.ServiceConfiguration]) /// The state once ``ServiceGroup/run()`` has been called. case running( - gracefulShutdownStreamContinuation: AsyncStream.Continuation + gracefulShutdownStreamContinuation: AsyncStream.Continuation, + addedServiceChannel: AsyncChannel ) /// The state once ``ServiceGroup/run()`` has finished. case finished @@ -106,6 +108,37 @@ public actor ServiceGroup: Sendable, Service { self.maximumCancellationDuration = configuration._maximumCancellationDuration } + /// Adds a service to the group. + /// + /// If the group is currently running, the added service will be started immediately. + /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. + /// - Parameters: + /// - serviceConfiguration: The service configuration to add. + public func addService(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async { + switch self.state { + case var .initial(services: services): + self.state = .initial(services: []) + services.append(serviceConfiguration) + self.state = .initial(services: services) + + case .running(_, let addedServiceChannel): + await addedServiceChannel.send(serviceConfiguration) + + case .finished: + return + } + } + + /// Adds a service to the group. + /// + /// If the group is currently running, the added service will be started immediately. + /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. + /// - Parameters: + /// - service: The service to add. + public func addService(_ service: any Service) async { + await self.addService(ServiceGroupConfiguration.ServiceConfiguration(service: service)) + } + /// Runs all the services by spinning up a child task per service. /// Furthermore, this method sets up the correct signal handlers /// for graceful shutdown. @@ -128,16 +161,19 @@ public actor ServiceGroup: Sendable, Service { } let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self) + let addedServiceChannel = AsyncChannel() self.state = .running( - gracefulShutdownStreamContinuation: gracefulShutdownContinuation + gracefulShutdownStreamContinuation: gracefulShutdownContinuation, + addedServiceChannel: addedServiceChannel ) var potentialError: Error? do { try await self._run( services: &services, - gracefulShutdownStream: gracefulShutdownStream + gracefulShutdownStream: gracefulShutdownStream, + addedServiceChannel: addedServiceChannel ) } catch { potentialError = error @@ -173,7 +209,7 @@ public actor ServiceGroup: Sendable, Service { self.state = .finished return - case .running(let gracefulShutdownStreamContinuation): + case .running(let gracefulShutdownStreamContinuation, _): // We cannot transition to shuttingDown here since we are signalling over to the task // that runs `run`. This task is responsible for transitioning to shuttingDown since // there might be multiple signals racing to trigger it @@ -189,7 +225,7 @@ public actor ServiceGroup: Sendable, Service { } } - private enum ChildTaskResult { + fileprivate enum ChildTaskResult { case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int) case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error) case signalCaught(UnixSignal) @@ -202,7 +238,8 @@ public actor ServiceGroup: Sendable, Service { private func _run( services: inout [ServiceGroupConfiguration.ServiceConfiguration], - gracefulShutdownStream: AsyncStream + gracefulShutdownStream: AsyncStream, + addedServiceChannel: AsyncChannel ) async throws { self.logger.debug( "Starting service lifecycle", @@ -280,25 +317,11 @@ public actor ServiceGroup: Sendable, Service { let gracefulShutdownManager = GracefulShutdownManager() gracefulShutdownManagers.append(gracefulShutdownManager) - // This must be addTask and not addTaskUnlessCancelled - // because we must run all the services for the below logic to work. - group.addTask { - return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { - do { - try await serviceConfiguration.service.run() - return .serviceFinished(service: serviceConfiguration, index: index) - } catch { - return .serviceThrew(service: serviceConfiguration, index: index, error: error) - } - } - } - } - - group.addTask { - // This child task is waiting forever until the group gets cancelled. - let (stream, _) = AsyncStream.makeStream(of: Void.self) - await stream.first { _ in true } - return .cancellationCaught + group.addServiceTask( + serviceConfiguration, + gracefulShutdownManager: gracefulShutdownManager, + index: index + ) } // We are storing the services in an optional array now. When a slot in the array is @@ -310,6 +333,49 @@ public actor ServiceGroup: Sendable, Service { "We did not create a graceful shutdown manager per service" ) + var taskGroupThatMustNotEscape = group + group.addTask { + // This is the task that listens to added services and starts them while the group is running + + await withTaskCancellationHandler { + // Channel will be finished in `shutdownGracefully`, we must not add services after graceful shutdown has started + for await serviceConfiguration in addedServiceChannel { + self.logger.debug( + "Starting added service", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)" + ] + ) + + let gracefulShutdownManager = GracefulShutdownManager() + gracefulShutdownManagers.append(gracefulShutdownManager) + services.append(serviceConfiguration) + + precondition( + services.count == gracefulShutdownManagers.count, + "Mismatch between services and graceful shutdown managers" + ) + + taskGroupThatMustNotEscape.addServiceTask( + serviceConfiguration, + gracefulShutdownManager: gracefulShutdownManager, + index: services.count - 1 + ) + } + } onCancel: { + addedServiceChannel.finish() + } + + return .gracefulShutdownFinished + } + + group.addTask { + // This child task is waiting forever until the group gets cancelled. + let (stream, _) = AsyncStream.makeStream(of: Void.self) + await stream.first { _ in true } + return .cancellationCaught + } + // We are going to wait for any of the services to finish or // the signal sequence to throw an error. while !group.isEmpty { @@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service { group: inout ThrowingTaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] ) async throws { - guard case .running = self.state else { + guard case let .running(_, addedServiceChannel) = self.state else { fatalError("Unexpected state") } + // Signal to stop adding new services (it is important that no new services are added after this point) + addedServiceChannel.finish() + if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *), let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration { @@ -779,6 +848,29 @@ public actor ServiceGroup: Sendable, Service { } } +extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGroup.ChildTaskResult { + mutating func addServiceTask( + _ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration, + gracefulShutdownManager: GracefulShutdownManager, + index: Int + ) { + // This must be addTask and not addTaskUnlessCancelled + // because we must run all the services for the shutdown logic to work. + self.addTask { + return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { + do { + try await serviceConfiguration.service.run() + return .serviceFinished(service: serviceConfiguration, index: index) + } catch { + return .serviceThrew(service: serviceConfiguration, index: index, error: error) + } + } + } + + } + +} + // This should be removed once we support Swift 5.9+ extension AsyncStream { fileprivate static func makeStream( diff --git a/Tests/ServiceLifecycleTests/MockService.swift b/Tests/ServiceLifecycleTests/MockService.swift new file mode 100644 index 0000000..f333ceb --- /dev/null +++ b/Tests/ServiceLifecycleTests/MockService.swift @@ -0,0 +1,71 @@ +import ServiceLifecycle + +actor MockService: Service, CustomStringConvertible { + enum Event { + case run + case runPing + case runCancelled + case shutdownGracefully + } + + let events: AsyncStream + internal private(set) var hasRun: Bool = false + + private let eventsContinuation: AsyncStream.Continuation + + private var runContinuation: CheckedContinuation? + + nonisolated let description: String + + private let pings: AsyncStream + private nonisolated let pingContinuation: AsyncStream.Continuation + + init( + description: String + ) { + var eventsContinuation: AsyncStream.Continuation! + self.events = AsyncStream { eventsContinuation = $0 } + self.eventsContinuation = eventsContinuation! + + var pingContinuation: AsyncStream.Continuation! + self.pings = AsyncStream { pingContinuation = $0 } + self.pingContinuation = pingContinuation! + + self.description = description + } + + func run() async throws { + self.hasRun = true + + try await withTaskCancellationHandler { + try await withGracefulShutdownHandler { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + self.eventsContinuation.yield(.run) + for await _ in self.pings { + self.eventsContinuation.yield(.runPing) + } + } + + try await withCheckedThrowingContinuation { + self.runContinuation = $0 + } + + group.cancelAll() + } + } onGracefulShutdown: { + self.eventsContinuation.yield(.shutdownGracefully) + } + } onCancel: { + self.eventsContinuation.yield(.runCancelled) + } + } + + func resumeRunContinuation(with result: Result) { + self.runContinuation?.resume(with: result) + } + + nonisolated func sendPing() { + self.pingContinuation.yield() + } +} diff --git a/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift new file mode 100644 index 0000000..e67f3bd --- /dev/null +++ b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift @@ -0,0 +1,397 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftServiceLifecycle open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import ServiceLifecycle +import UnixSignals +import XCTest + +private struct ExampleError: Error, Hashable {} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class ServiceGroupAddServiceTests: XCTestCase { + + func testAddService_whenNotRunning() async { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup() + await serviceGroup.addService(mockService) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + group.cancelAll() + await XCTAsyncAssertEqual(await eventIterator.next(), .runCancelled) + + await mockService.resumeRunContinuation(with: .success(())) + } + + } + + func testAddService_whenRunning() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + var eventIterator2 = mockService2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addService(mockService2) + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + await mockService1.resumeRunContinuation(with: .success(())) + + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + await mockService2.resumeRunContinuation(with: .success(())) + } + } + + func testAddService_whenShuttingDown() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + await serviceGroup.addService(mockService2) + + await mockService1.resumeRunContinuation(with: .success(())) + } + + await XCTAsyncAssertEqual(await mockService2.hasRun, false) + } + + func testAddService_whenCancelling() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + group.cancelAll() + + await XCTAsyncAssertEqual(await eventIterator1.next(), .runCancelled) + await serviceGroup.addService(mockService2) + + await mockService1.resumeRunContinuation(with: .success(())) + } + + await XCTAsyncAssertEqual(await mockService2.hasRun, false) + } + + func testRun_whenAddedServiceExitsEarly_andIgnore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [], + gracefulShutdownSignals: [.sigalrm] + ) + + await serviceGroup.addService(.init(service: service1, successTerminationBehavior: .ignore)) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addService(.init(service: service2, failureTerminationBehavior: .ignore)) + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + await service1.resumeRunContinuation(with: .success(())) + + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testRun_whenAddedServiceExitsEarly_andShutdownGracefully() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1) + ] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addService( + .init(service: service2, successTerminationBehavior: .gracefullyShutdownGroup) + ) + await serviceGroup.addService(.init(service: service3)) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await service2.resumeRunContinuation(with: .success(())) + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that the remaining two are still running + service1.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // Waiting to see that the remaining is still running + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testRun_whenAddedServiceThrows() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var service1EventIterator = service1.events.makeAsyncIterator() + var service2EventIterator = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await service1EventIterator.next(), .run) + await serviceGroup.addService(service2) + + await XCTAsyncAssertEqual(await service2EventIterator.next(), .run) + + // Throwing from service2 here and expect that service1 gets cancelled + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + await XCTAsyncAssertEqual(await service1EventIterator.next(), .runCancelled) + await service1.resumeRunContinuation(with: .success(())) + + try await XCTAsyncAssertThrowsError(await group.next()) { + XCTAssertTrue($0 is ExampleError) + } + } + } + + func testGracefulShutdownOrdering_withAddedServices() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addService(service2) + await serviceGroup.addService(service3) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) // ignore-unacceptable-language + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the middle service + await service2.resumeRunContinuation(with: .success(())) + + // The first service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testGracefulShutdownOrdering_whenAddedServiceExits() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addService(service2) + await serviceGroup.addService(service3) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) // ignore-unacceptable-language + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + // The middle service should now receive a cancellation + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + + // Let's exit from the first service + await service2.resumeRunContinuation(with: .success(())) + } + } + + // MARK: - Helpers + + private func makeServiceGroup( + services: [ServiceGroupConfiguration.ServiceConfiguration] = [], + gracefulShutdownSignals: [UnixSignal] = .init(), + cancellationSignals: [UnixSignal] = .init(), + maximumGracefulShutdownDuration: Duration? = nil, + maximumCancellationDuration: Duration? = .seconds(5) + ) -> ServiceGroup { + var logger = Logger(label: "Tests") + logger.logLevel = .debug + + var configuration = ServiceGroupConfiguration( + services: services, + gracefulShutdownSignals: gracefulShutdownSignals, + cancellationSignals: cancellationSignals, + logger: logger + ) + configuration.maximumGracefulShutdownDuration = maximumGracefulShutdownDuration + configuration.maximumCancellationDuration = maximumCancellationDuration + return .init( + configuration: configuration + ) + } +} diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index b536d2a..0120251 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -19,73 +19,6 @@ import XCTest private struct ExampleError: Error, Hashable {} -private actor MockService: Service, CustomStringConvertible { - enum Event { - case run - case runPing - case runCancelled - case shutdownGracefully - } - - let events: AsyncStream - - private let eventsContinuation: AsyncStream.Continuation - - private var runContinuation: CheckedContinuation? - - nonisolated let description: String - - private let pings: AsyncStream - private nonisolated let pingContinuation: AsyncStream.Continuation - - init( - description: String - ) { - var eventsContinuation: AsyncStream.Continuation! - self.events = AsyncStream { eventsContinuation = $0 } - self.eventsContinuation = eventsContinuation! - - var pingContinuation: AsyncStream.Continuation! - self.pings = AsyncStream { pingContinuation = $0 } - self.pingContinuation = pingContinuation! - - self.description = description - } - - func run() async throws { - try await withTaskCancellationHandler { - try await withGracefulShutdownHandler { - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - self.eventsContinuation.yield(.run) - for await _ in self.pings { - self.eventsContinuation.yield(.runPing) - } - } - - try await withCheckedThrowingContinuation { - self.runContinuation = $0 - } - - group.cancelAll() - } - } onGracefulShutdown: { - self.eventsContinuation.yield(.shutdownGracefully) - } - } onCancel: { - self.eventsContinuation.yield(.runCancelled) - } - } - - func resumeRunContinuation(with result: Result) { - self.runContinuation?.resume(with: result) - } - - nonisolated func sendPing() { - self.pingContinuation.yield() - } -} - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ServiceGroupTests: XCTestCase { func testRun_whenAlreadyRunning() async throws { From 7dce0c3585e32f16bcc60c3614c2b7760f63e4c0 Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:50:20 +0100 Subject: [PATCH 2/6] rename --- Sources/ServiceLifecycle/ServiceGroup.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index eb931e9..d73b3f2 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -333,7 +333,7 @@ public actor ServiceGroup: Sendable, Service { "We did not create a graceful shutdown manager per service" ) - var taskGroupThatMustNotEscape = group + var _unownedTaskGroupHandledCarefully = group group.addTask { // This is the task that listens to added services and starts them while the group is running @@ -356,7 +356,7 @@ public actor ServiceGroup: Sendable, Service { "Mismatch between services and graceful shutdown managers" ) - taskGroupThatMustNotEscape.addServiceTask( + _unownedTaskGroupHandledCarefully.addServiceTask( serviceConfiguration, gracefulShutdownManager: gracefulShutdownManager, index: services.count - 1 From 84ccf188d6ace6efd8e70e386668215596840dd5 Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Wed, 5 Mar 2025 19:51:31 +0100 Subject: [PATCH 3/6] shovel added services into task group results --- Sources/ServiceLifecycle/ServiceGroup.swift | 91 ++++++++++++--------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index d73b3f2..60cb201 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -234,6 +234,7 @@ public actor ServiceGroup: Sendable, Service { case gracefulShutdownFinished case gracefulShutdownTimedOut case cancellationCaught + case newServiceAdded(ServiceGroupConfiguration.ServiceConfiguration) } private func _run( @@ -333,42 +334,6 @@ public actor ServiceGroup: Sendable, Service { "We did not create a graceful shutdown manager per service" ) - var _unownedTaskGroupHandledCarefully = group - group.addTask { - // This is the task that listens to added services and starts them while the group is running - - await withTaskCancellationHandler { - // Channel will be finished in `shutdownGracefully`, we must not add services after graceful shutdown has started - for await serviceConfiguration in addedServiceChannel { - self.logger.debug( - "Starting added service", - metadata: [ - self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)" - ] - ) - - let gracefulShutdownManager = GracefulShutdownManager() - gracefulShutdownManagers.append(gracefulShutdownManager) - services.append(serviceConfiguration) - - precondition( - services.count == gracefulShutdownManagers.count, - "Mismatch between services and graceful shutdown managers" - ) - - _unownedTaskGroupHandledCarefully.addServiceTask( - serviceConfiguration, - gracefulShutdownManager: gracefulShutdownManager, - index: services.count - 1 - ) - } - } onCancel: { - addedServiceChannel.finish() - } - - return .gracefulShutdownFinished - } - group.addTask { // This child task is waiting forever until the group gets cancelled. let (stream, _) = AsyncStream.makeStream(of: Void.self) @@ -376,12 +341,41 @@ public actor ServiceGroup: Sendable, Service { return .cancellationCaught } + // Adds a task that listens to added services and funnels them into the task group + group.addAddedServiceListenerTask(addedServiceChannel) + // We are going to wait for any of the services to finish or // the signal sequence to throw an error. while !group.isEmpty { - let result: ChildTaskResult? = try await group.next() + let nextEvent = try await group.next() + + switch nextEvent { + case .newServiceAdded(let serviceConfiguration): + self.logger.debug( + "Starting added service", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)" + ] + ) + + let gracefulShutdownManager = GracefulShutdownManager() + gracefulShutdownManagers.append(gracefulShutdownManager) + services.append(serviceConfiguration) + + precondition( + services.count == gracefulShutdownManagers.count, + "Mismatch between services and graceful shutdown managers" + ) + + group.addServiceTask( + serviceConfiguration, + gracefulShutdownManager: gracefulShutdownManager, + index: services.count - 1 + ) + + // Each listener task can only handle a single added service, so we must add a new listener + group.addAddedServiceListenerTask(addedServiceChannel) - switch result { case .serviceFinished(let service, let index): if group.isCancelled { // The group is cancelled and we expect all services to finish @@ -786,6 +780,11 @@ public actor ServiceGroup: Sendable, Service { // We are going to continue the result loop since we have to wait for our service // to finish. break + + case .newServiceAdded: + // TBD: How do we treat added services during graceful shutdown? + // Currently, we ignore them - but we make sure that `run` is never called + break } } } @@ -866,9 +865,23 @@ extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGr } } } - } + mutating func addAddedServiceListenerTask(_ channel: AsyncChannel) { + self.addTask { + return await withTaskCancellationHandler { + var iterator = channel.makeAsyncIterator() + if let addedService = await iterator.next() { + return .newServiceAdded(addedService) + } + + return .gracefulShutdownFinished + } onCancel: { + // Without this we can get stuck in `addService` if the group + channel.finish() + } + } + } } // This should be removed once we support Swift 5.9+ From 25133385c639f08c2c6c0f430ef75697f96c8c20 Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Thu, 6 Mar 2025 08:30:11 +0100 Subject: [PATCH 4/6] renamed to addServiceUnlessShutdown and minor touch-ups --- Sources/ServiceLifecycle/ServiceGroup.swift | 53 +++++++++++-------- .../ServiceGroupAddServiceTests.swift | 26 ++++----- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 60cb201..ff39595 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -108,13 +108,13 @@ public actor ServiceGroup: Sendable, Service { self.maximumCancellationDuration = configuration._maximumCancellationDuration } - /// Adds a service to the group. + /// Adds a new service to the group. /// /// If the group is currently running, the added service will be started immediately. /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. /// - Parameters: /// - serviceConfiguration: The service configuration to add. - public func addService(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async { + public func addServiceUnlessShutdown(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async { switch self.state { case var .initial(services: services): self.state = .initial(services: []) @@ -125,18 +125,19 @@ public actor ServiceGroup: Sendable, Service { await addedServiceChannel.send(serviceConfiguration) case .finished: + // Since this is a best effort operation we don't have to do anything here return } } - /// Adds a service to the group. + /// Adds a new service to the group. /// /// If the group is currently running, the added service will be started immediately. /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. /// - Parameters: /// - service: The service to add. - public func addService(_ service: any Service) async { - await self.addService(ServiceGroupConfiguration.ServiceConfiguration(service: service)) + public func addServiceUnlessShutdown(_ service: any Service) async { + await self.addServiceUnlessShutdown(ServiceGroupConfiguration.ServiceConfiguration(service: service)) } /// Runs all the services by spinning up a child task per service. @@ -225,7 +226,7 @@ public actor ServiceGroup: Sendable, Service { } } - fileprivate enum ChildTaskResult { + private enum ChildTaskResult { case serviceFinished(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int) case serviceThrew(service: ServiceGroupConfiguration.ServiceConfiguration, index: Int, error: any Error) case signalCaught(UnixSignal) @@ -318,8 +319,9 @@ public actor ServiceGroup: Sendable, Service { let gracefulShutdownManager = GracefulShutdownManager() gracefulShutdownManagers.append(gracefulShutdownManager) - group.addServiceTask( - serviceConfiguration, + self.addServiceTask( + group: &group, + service: serviceConfiguration, gracefulShutdownManager: gracefulShutdownManager, index: index ) @@ -342,14 +344,14 @@ public actor ServiceGroup: Sendable, Service { } // Adds a task that listens to added services and funnels them into the task group - group.addAddedServiceListenerTask(addedServiceChannel) + self.addAddedServiceListenerTask(group: &group, channel: addedServiceChannel) // We are going to wait for any of the services to finish or // the signal sequence to throw an error. while !group.isEmpty { - let nextEvent = try await group.next() + let result: ChildTaskResult? = try await group.next() - switch nextEvent { + switch result { case .newServiceAdded(let serviceConfiguration): self.logger.debug( "Starting added service", @@ -367,14 +369,18 @@ public actor ServiceGroup: Sendable, Service { "Mismatch between services and graceful shutdown managers" ) - group.addServiceTask( - serviceConfiguration, + self.addServiceTask( + group: &group, + service: serviceConfiguration, gracefulShutdownManager: gracefulShutdownManager, index: services.count - 1 ) // Each listener task can only handle a single added service, so we must add a new listener - group.addAddedServiceListenerTask(addedServiceChannel) + self.addAddedServiceListenerTask( + group: &group, + channel: addedServiceChannel + ) case .serviceFinished(let service, let index): if group.isCancelled { @@ -782,8 +788,7 @@ public actor ServiceGroup: Sendable, Service { break case .newServiceAdded: - // TBD: How do we treat added services during graceful shutdown? - // Currently, we ignore them - but we make sure that `run` is never called + // Since adding services is best effort, we simply ignore this break } } @@ -845,17 +850,16 @@ public actor ServiceGroup: Sendable, Service { cancellationTimeoutTask = nil } } -} -extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGroup.ChildTaskResult { - mutating func addServiceTask( - _ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration, + private func addServiceTask( + group: inout ThrowingTaskGroup, + service serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration, gracefulShutdownManager: GracefulShutdownManager, index: Int ) { // This must be addTask and not addTaskUnlessCancelled // because we must run all the services for the shutdown logic to work. - self.addTask { + group.addTask { return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { do { try await serviceConfiguration.service.run() @@ -867,8 +871,11 @@ extension ThrowingTaskGroup where Failure == Error, ChildTaskResult == ServiceGr } } - mutating func addAddedServiceListenerTask(_ channel: AsyncChannel) { - self.addTask { + private func addAddedServiceListenerTask( + group: inout ThrowingTaskGroup, + channel: AsyncChannel + ) { + group.addTask { return await withTaskCancellationHandler { var iterator = channel.makeAsyncIterator() if let addedService = await iterator.next() { diff --git a/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift index e67f3bd..9bed364 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift @@ -25,7 +25,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { func testAddService_whenNotRunning() async { let mockService = MockService(description: "Service1") let serviceGroup = self.makeServiceGroup() - await serviceGroup.addService(mockService) + await serviceGroup.addServiceUnlessShutdown(mockService) await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -59,7 +59,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { var eventIterator2 = mockService2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator1.next(), .run) - await serviceGroup.addService(mockService2) + await serviceGroup.addServiceUnlessShutdown(mockService2) await XCTAsyncAssertEqual(await eventIterator2.next(), .run) await mockService1.resumeRunContinuation(with: .success(())) @@ -87,7 +87,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { await serviceGroup.triggerGracefulShutdown() await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) - await serviceGroup.addService(mockService2) + await serviceGroup.addServiceUnlessShutdown(mockService2) await mockService1.resumeRunContinuation(with: .success(())) } @@ -113,7 +113,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { group.cancelAll() await XCTAsyncAssertEqual(await eventIterator1.next(), .runCancelled) - await serviceGroup.addService(mockService2) + await serviceGroup.addServiceUnlessShutdown(mockService2) await mockService1.resumeRunContinuation(with: .success(())) } @@ -129,7 +129,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { gracefulShutdownSignals: [.sigalrm] ) - await serviceGroup.addService(.init(service: service1, successTerminationBehavior: .ignore)) + await serviceGroup.addServiceUnlessShutdown(.init(service: service1, successTerminationBehavior: .ignore)) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { @@ -140,7 +140,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { var eventIterator2 = service2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator1.next(), .run) - await serviceGroup.addService(.init(service: service2, failureTerminationBehavior: .ignore)) + await serviceGroup.addServiceUnlessShutdown(.init(service: service2, failureTerminationBehavior: .ignore)) await XCTAsyncAssertEqual(await eventIterator2.next(), .run) await service1.resumeRunContinuation(with: .success(())) @@ -172,10 +172,10 @@ final class ServiceGroupAddServiceTests: XCTestCase { var eventIterator1 = service1.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator1.next(), .run) - await serviceGroup.addService( + await serviceGroup.addServiceUnlessShutdown( .init(service: service2, successTerminationBehavior: .gracefullyShutdownGroup) ) - await serviceGroup.addService(.init(service: service3)) + await serviceGroup.addServiceUnlessShutdown(.init(service: service3)) var eventIterator2 = service2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator2.next(), .run) @@ -223,7 +223,7 @@ final class ServiceGroupAddServiceTests: XCTestCase { var service1EventIterator = service1.events.makeAsyncIterator() var service2EventIterator = service2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await service1EventIterator.next(), .run) - await serviceGroup.addService(service2) + await serviceGroup.addServiceUnlessShutdown(service2) await XCTAsyncAssertEqual(await service2EventIterator.next(), .run) @@ -256,8 +256,8 @@ final class ServiceGroupAddServiceTests: XCTestCase { var eventIterator1 = service1.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator1.next(), .run) - await serviceGroup.addService(service2) - await serviceGroup.addService(service3) + await serviceGroup.addServiceUnlessShutdown(service2) + await serviceGroup.addServiceUnlessShutdown(service3) var eventIterator2 = service2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator2.next(), .run) @@ -324,8 +324,8 @@ final class ServiceGroupAddServiceTests: XCTestCase { var eventIterator1 = service1.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator1.next(), .run) - await serviceGroup.addService(service2) - await serviceGroup.addService(service3) + await serviceGroup.addServiceUnlessShutdown(service2) + await serviceGroup.addServiceUnlessShutdown(service3) var eventIterator2 = service2.events.makeAsyncIterator() await XCTAsyncAssertEqual(await eventIterator2.next(), .run) From a837a30dea3e522f084386c9af7749c0ec0a446b Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Thu, 6 Mar 2025 08:59:12 +0100 Subject: [PATCH 5/6] improved comment about channel finish --- Sources/ServiceLifecycle/ServiceGroup.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index ff39595..580313a 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -884,7 +884,8 @@ public actor ServiceGroup: Sendable, Service { return .gracefulShutdownFinished } onCancel: { - // Without this we can get stuck in `addService` if the group + // Once the group is cancelled we will no longer read from the channel. + // This will resume any suspended producer in `addServiceUnlessShutdown`. channel.finish() } } From 10ac76e87898175ec1bb338520c3418178eefcac Mon Sep 17 00:00:00 2001 From: Simon Leeb <52261246+sliemeobn@users.noreply.github.com> Date: Thu, 6 Mar 2025 12:42:46 +0100 Subject: [PATCH 6/6] added missing license header --- Tests/ServiceLifecycleTests/MockService.swift | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/Tests/ServiceLifecycleTests/MockService.swift b/Tests/ServiceLifecycleTests/MockService.swift index f333ceb..ecef7dc 100644 --- a/Tests/ServiceLifecycleTests/MockService.swift +++ b/Tests/ServiceLifecycleTests/MockService.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftServiceLifecycle open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + import ServiceLifecycle actor MockService: Service, CustomStringConvertible { @@ -9,7 +23,7 @@ actor MockService: Service, CustomStringConvertible { } let events: AsyncStream - internal private(set) var hasRun: Bool = false + private(set) var hasRun: Bool = false private let eventsContinuation: AsyncStream.Continuation @@ -24,18 +38,18 @@ actor MockService: Service, CustomStringConvertible { description: String ) { var eventsContinuation: AsyncStream.Continuation! - self.events = AsyncStream { eventsContinuation = $0 } + events = AsyncStream { eventsContinuation = $0 } self.eventsContinuation = eventsContinuation! var pingContinuation: AsyncStream.Continuation! - self.pings = AsyncStream { pingContinuation = $0 } + pings = AsyncStream { pingContinuation = $0 } self.pingContinuation = pingContinuation! self.description = description } func run() async throws { - self.hasRun = true + hasRun = true try await withTaskCancellationHandler { try await withGracefulShutdownHandler { @@ -62,10 +76,10 @@ actor MockService: Service, CustomStringConvertible { } func resumeRunContinuation(with result: Result) { - self.runContinuation?.resume(with: result) + runContinuation?.resume(with: result) } nonisolated func sendPing() { - self.pingContinuation.yield() + pingContinuation.yield() } }