Skip to content

Commit c9b2a6b

Browse files
authored
Implement async sequence output at (#178)
Feature parity with Combine - https://developer.apple.com/documentation/combine/publisher/output(at:) 1. Implement `output(at:)` similar to `collect`. 2. Document. 3. Test. 4. Extract some repeated test helpers to avoid copying them.
1 parent d436bb6 commit c9b2a6b

File tree

8 files changed

+136
-54
lines changed

8 files changed

+136
-54
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//
2+
// OutputSequence.swift
3+
// Afluent
4+
//
5+
// Created by Roman Temchenko on 2025-03-05.
6+
//
7+
8+
import Foundation
9+
10+
extension AsyncSequences {
11+
public struct OutputAt<Upstream: AsyncSequence & Sendable>: AsyncSequence, Sendable {
12+
public typealias Element = Upstream.Element
13+
let upstream: Upstream
14+
let index: Int
15+
16+
public struct AsyncIterator: AsyncIteratorProtocol {
17+
var upstreamIterator: Upstream.AsyncIterator
18+
let index: Int
19+
var nextIndex = 0
20+
21+
public mutating func next() async throws -> Element? {
22+
guard nextIndex <= index else { return nil }
23+
while let next = try await upstreamIterator.next() {
24+
if nextIndex == index {
25+
nextIndex &+= 1
26+
return next
27+
}
28+
nextIndex &+= 1
29+
}
30+
return nil
31+
}
32+
}
33+
34+
public func makeAsyncIterator() -> AsyncIterator {
35+
AsyncIterator(upstreamIterator: upstream.makeAsyncIterator(), index: index)
36+
}
37+
}
38+
39+
}
40+
41+
extension AsyncSequence where Self: Sendable {
42+
43+
/// Returns a sequence containing a specific indexed element.
44+
/// If the sequence finishes normally or with an error before emitting the specified element, then the sequence doesn’t produce any elements.
45+
/// - Parameter index: The index that indicates the element needed.
46+
/// - Returns: A sequence containing a specific indexed element.
47+
public func output(at index: Int) -> AsyncSequences.OutputAt<Self> {
48+
AsyncSequences.OutputAt(upstream: self, index: index)
49+
}
50+
51+
}

Tests/AfluentTests/SequenceTests/CollectSequenceTests.swift

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,11 @@ struct CollectSequenceTests {
3737
}
3838

3939
@Test func testCollectWithSequenceThrowingError() async throws {
40-
enum TestError: Error, Equatable {
41-
case someError
42-
}
43-
4440
let errorSequence = AsyncThrowingStream<Int, Error> { continuation in
45-
continuation.finish(throwing: TestError.someError)
41+
continuation.finish(throwing: GeneralError.e1)
4642
}
47-
do {
43+
await #expect(throws: GeneralError.e1, performing: {
4844
_ = try await errorSequence.collect().first()
49-
Issue.record("Collect should throw the error encountered in the sequence")
50-
} catch {
51-
#expect(error as? TestError == .someError, "Collect should throw the correct error")
52-
}
53-
}
54-
}
55-
56-
extension Array where Element: Sendable {
57-
fileprivate var async: AsyncStream<Element> {
58-
AsyncStream { continuation in
59-
for element in self {
60-
continuation.yield(element)
61-
}
62-
continuation.finish()
63-
}
45+
})
6446
}
6547
}

Tests/AfluentTests/SequenceTests/DelaySequenceTests.swift

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,3 @@ struct DelaySequenceTests {
130130
}
131131
}
132132
}
133-
134-
extension Array where Element: Sendable {
135-
fileprivate var async: AsyncStream<Element> {
136-
AsyncStream { continuation in
137-
for element in self {
138-
continuation.yield(element)
139-
}
140-
continuation.finish()
141-
}
142-
}
143-
}

Tests/AfluentTests/SequenceTests/HandleEventsSequenceTests.swift

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,3 @@ struct HandleEventsSequenceTests {
159159
}
160160
}
161161
}
162-
163-
extension Array where Element: Sendable {
164-
fileprivate var async: AsyncStream<Element> {
165-
AsyncStream { continuation in
166-
for element in self {
167-
continuation.yield(element)
168-
}
169-
continuation.finish()
170-
}
171-
}
172-
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//
2+
// OutputSequenceTests.swift
3+
// Afluent
4+
//
5+
// Created by Roman Temchenko on 2025-03-07.
6+
//
7+
8+
import Afluent
9+
import Foundation
10+
import Testing
11+
import ConcurrencyExtras
12+
13+
struct OutputSequenceTests {
14+
15+
@Test func testOutputAt() async throws {
16+
let emptySequence = [0, 3, 5].async
17+
let result = try await emptySequence.output(at: 1).first()
18+
#expect(result == 3)
19+
}
20+
21+
@Test func testOutputAtWithEmptySequence() async throws {
22+
let emptySequence = [Int]().async
23+
let result = try await emptySequence.output(at: 0).first()
24+
try #require(result == nil)
25+
}
26+
27+
@Test func testOutputAtOutOfBounds() async throws {
28+
let emptySequence = [0, 3, 5].async
29+
let result = try await emptySequence.output(at: 5).first()
30+
#expect(result == nil)
31+
}
32+
33+
@Test func testOutputAtWithSequenceThrowingError() async throws {
34+
let errorSequence = AsyncThrowingStream<Int, Error> { continuation in
35+
continuation.finish(throwing: GeneralError.e1)
36+
}
37+
await #expect(throws: GeneralError.e1, performing: {
38+
_ = try await errorSequence.output(at: 0).first()
39+
})
40+
}
41+
42+
@Test func testOutputAtCancellation() async throws {
43+
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Int.self)
44+
45+
let task = Task {
46+
let result = try await stream.output(at: 4).first()
47+
#expect(result == nil)
48+
return result
49+
}
50+
51+
continuation.yield(0)
52+
task.cancel()
53+
54+
// Give task cancellation time to propagate.
55+
await Task.megaYield()
56+
57+
continuation.finish(throwing: GeneralError.e1)
58+
59+
let result = try await task.value
60+
#expect(result == nil)
61+
}
62+
63+
}

Tests/AfluentTests/SequenceTests/ShareSequenceTests.swift

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -326,17 +326,6 @@ func throwOn<T: Equatable>(_ toThrowOn: T, _ value: T) throws -> T {
326326
return value
327327
}
328328

329-
extension Array where Element: Sendable {
330-
fileprivate var async: AsyncStream<Element> {
331-
AsyncStream<Element> {
332-
for item in self {
333-
$0.yield(item)
334-
}
335-
$0.finish()
336-
}
337-
}
338-
}
339-
340329
extension RangeReplaceableCollection {
341330
fileprivate init<Source: AsyncSequence>(_ source: Source) async rethrows
342331
where Source.Element == Element {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//
2+
// ArrayExtensions.swift
3+
// Afluent
4+
//
5+
// Created by Roman Temchenko on 2025-03-07.
6+
//
7+
8+
import Foundation
9+
10+
extension Array where Element: Sendable {
11+
var async: AsyncStream<Element> {
12+
AsyncStream<Element> {
13+
for item in self {
14+
$0.yield(item)
15+
}
16+
$0.finish()
17+
}
18+
}
19+
}

0 commit comments

Comments
 (0)