Skip to content
181 changes: 181 additions & 0 deletions Evolution/00nn-flatMapLatest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# FlatMapLatest

* Proposal: [SAA-00nn](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/00nn-flatMapLatest.md)
* Author(s): [Peter Friese](https://github.com/peterfriese)
* Status: **Proposed**
* Implementation:
[
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/FlatMapLatest/AsyncFlatMapLatestSequence.swift) |
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestFlatMapLatest.swift)
]
* Decision Notes:
* Bugs:

## Introduction

When transforming elements of an asynchronous sequence into new asynchronous sequences, there are cases where only the results from the most recent transformation are relevant, and previous transformations should be abandoned. This is particularly common in reactive user interfaces where rapid user input triggers asynchronous operations, but only the result of the latest operation matters.

## Motivation

Consider a search-as-you-type interface where each keystroke triggers a network request:

```swift
let searchQueries = userInputField.textChanges

// Without flatMapLatest - all requests complete, wasting resources
for await query in searchQueries {
let results = try await searchAPI(query)
displayResults(results) // May display stale results
}
```

Without automatic cancellation, earlier requests continue to completion even though their results are no longer relevant. This wastes network bandwidth, server resources, and may display stale results to the user if a slower request completes after a faster one.

The `flatMapLatest` operator solves this by automatically cancelling iteration on the previous inner sequence whenever a new element arrives from the outer sequence:

```swift
let searchResults = searchQueries.flatMapLatest { query in
searchAPI(query)
}

for await result in searchResults {
displayResults(result) // Only latest results displayed
}
```

This pattern is broadly applicable:
- **Location-based queries**: Cancel previous location lookups when user moves
- **Dynamic configuration**: Restart data loading when settings change
- **Auto-save**: Only save the most recent changes when user types rapidly
- **Real-time data**: Switch to new data streams based on user selections

## Proposed Solution

The `flatMapLatest` algorithm transforms each element from the base `AsyncSequence` into a new inner `AsyncSequence` using a transform closure. When a new element is produced by the base sequence, iteration on the current inner sequence is cancelled, and iteration begins on the newly created sequence.

The interface is available on all `AsyncSequence` types where both the base and inner sequences are `Sendable`:

```swift
extension AsyncSequence where Self: Sendable {
public func flatMapLatest<T: AsyncSequence & Sendable>(
_ transform: @escaping @Sendable (Element) -> T
) -> AsyncFlatMapLatestSequence<Self, T>
}
```

This provides a clean API for expressing switching behavior:

```swift
userActions.flatMapLatest { action in
performAction(action)
}
```

## Detailed Design

The type that implements the algorithm emits elements from the inner sequences. It throws when either the base type or any inner sequence throws.

```swift
public struct AsyncFlatMapLatestSequence<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable>: AsyncSequence, Sendable
where Base.Element: Sendable, Inner.Element: Sendable {
public typealias Element = Inner.Element

public struct Iterator: AsyncIteratorProtocol, Sendable {
public func next() async throws -> Element?
}

public func makeAsyncIterator() -> Iterator
}
```

Since both the base sequence and inner sequences must be `Sendable` (to support concurrent iteration and cancellation), `AsyncFlatMapLatestSequence` is unconditionally `Sendable`.

### Implementation Strategy

The implementation uses a state machine pattern to ensure thread-safe operation:

1. **Generation Tracking**: Each new inner sequence is assigned a generation number. Elements from stale generations are discarded.
2. **Explicit Cancellation**: When a new outer element arrives, the previous inner sequence's task is explicitly cancelled and its continuation is resumed with a cancellation error.
3. **Lock-Based Coordination**: A `Lock` protects the state machine from concurrent access.
4. **Continuation Management**: The storage manages continuations for both upstream (outer/inner sequences) and downstream (consumer) demand.

This approach eliminates race conditions where cancelled sequences could emit stale values.

### Behavioral Characteristics

**Switching**: When the base sequence produces a new element, the current inner sequence iteration is immediately cancelled. Any elements it would have produced are lost.

**Completion**: The sequence completes when:
1. The base sequence finishes producing elements, AND
2. The final inner sequence finishes producing elements

**Error Handling**: If either the base sequence or any inner sequence throws, the error is immediately propagated and all tasks are cancelled.

**Cancellation**: Cancelling the downstream iteration cancels both the base sequence iteration and the current inner sequence iteration.

## Example

```swift
let requests = AsyncStream<String> { continuation in
continuation.yield("query1")
try? await Task.sleep(for: .milliseconds(100))
continuation.yield("query2")
try? await Task.sleep(for: .milliseconds(100))
continuation.yield("query3")
continuation.finish()
}

let responses = requests.flatMapLatest { query in
AsyncStream<String> { continuation in
continuation.yield("\(query): loading")
try? await Task.sleep(for: .milliseconds(50))
continuation.yield("\(query): complete")
continuation.finish()
}
}

for await response in responses {
print(response)
}
// Output (may vary due to timing):
// query1: loading
// query2: loading
// query3: loading
// query3: complete
```

In this example, the earlier queries (query1 and query2) are cancelled before they complete, so only query3 produces its complete response.

## Effect on API Resilience

This is an additive API. No existing systems are changed. The new types introduced are:
- `AsyncFlatMapLatestSequence<Base, Inner>`: The sequence type
- Associated private types for the state machine implementation

These types will be part of the ABI surface area.

## Alternatives Considered

### Alternative Names

**`switchMap`**: Used in ReactiveX, but "switch" in Swift has strong association with control flow statements.

**`switchToLatest`**: Combine's terminology, but `flatMapLatest` is more discoverable alongside other `map` variants.

**`flatMap(...).latest()`**: Requires a hypothetical `flatMap` first, adding complexity.

### Delivering All Elements

An alternative behavior would buffer elements from cancelled sequences and deliver them later. However, this contradicts the core purpose of "latest" semantics and would be better served by a different operator.

### No Automatic Cancellation

Requiring manual cancellation would place significant burden on developers and be error-prone. The automatic cancellation is the key value proposition.

## Comparison with Other Libraries

**ReactiveX**: ReactiveX has an [API definition of switchMap](https://reactivex.io/documentation/operators/flatmap.html) that performs the same operation for Observables, switching to the latest inner Observable.

**Combine**: Combine has an [API definition of switchToLatest()](https://developer.apple.com/documentation/combine/publisher/switchtolatest()) which subscribes to the most recent Publisher produced by an upstream Publisher of Publishers. `flatMapLatest` combines the map and switch operations into a single convenient operator.

**RxSwift**: RxSwift calls this operator `flatMapLatest`, which is the naming this proposal adopts.
139 changes: 139 additions & 0 deletions Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/FlatMapLatest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# FlatMapLatest

* Author(s): [Peter Friese](https://github.com/peterfriese)

Transforms elements into asynchronous sequences, emitting elements from only the most recent inner sequence.

[[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/FlatMapLatest/AsyncFlatMapLatestSequence.swift) |
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestFlatMapLatest.swift)]

```swift
let searchQuery = AsyncStream<String> { continuation in
// User types into search field
continuation.yield("swi")
try? await Task.sleep(for: .milliseconds(100))
continuation.yield("swift")
try? await Task.sleep(for: .milliseconds(100))
continuation.yield("swift async")
continuation.finish()
}

let searchResults = searchQuery.flatMapLatest { query in
performSearch(query) // Returns AsyncSequence<SearchResult>
}

for try await result in searchResults {
print(result) // Only shows results from "swift async"
}
```

## Introduction

When transforming elements of an asynchronous sequence into new asynchronous sequences, you often want to abandon previous sequences when new data arrives. This is particularly useful for scenarios like search-as-you-type, where each keystroke triggers a new search request and you only care about results from the most recent query.

The `flatMapLatest` operator solves this by cancelling iteration on the previous inner sequence whenever a new element arrives from the outer sequence.

## Proposed Solution

The `flatMapLatest` algorithm transforms each element from the base `AsyncSequence` into a new inner `AsyncSequence` using the provided `transform` closure. When a new element is produced by the base sequence, iteration on the current inner sequence is cancelled, and iteration begins on the newly created sequence.

```swift
extension AsyncSequence where Self: Sendable {
public func flatMapLatest<T: AsyncSequence & Sendable>(
_ transform: @escaping @Sendable (Element) -> T
) -> AsyncFlatMapLatestSequence<Self, T>
}
```

This creates a concise way to express switching behavior:

```swift
userInput.flatMapLatest { input in
fetchDataFromNetwork(input)
}
```

In this case, each new user input cancels any ongoing network request and starts a fresh one, ensuring only the latest data is delivered.

## Detailed Design

The type that implements the algorithm emits elements from the inner sequences. It throws when either the base type or any inner sequence throws.

```swift
public struct AsyncFlatMapLatestSequence<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable>: AsyncSequence, Sendable
where Base.Element: Sendable, Inner.Element: Sendable {
public typealias Element = Inner.Element

public struct Iterator: AsyncIteratorProtocol, Sendable {
public func next() async throws -> Element?
}

public func makeAsyncIterator() -> Iterator
}
```

The implementation uses a state machine to ensure thread-safe operation and generation tracking to prevent stale values from cancelled sequences.

### Behavior

- **Switching**: When a new element arrives from the base sequence, the current inner sequence is cancelled immediately
- **Completion**: The sequence completes when the base sequence finishes and the final inner sequence completes
- **Error Handling**: Errors from either the base or inner sequences are propagated immediately
- **Cancellation**: Cancelling iteration cancels both the base and current inner sequence

## Use Cases

### Search as You Type

```swift
let searchField = AsyncStream<String> { continuation in
// Emit search queries as user types
continuation.yield("s")
continuation.yield("sw")
continuation.yield("swift")
}

let results = searchField.flatMapLatest { query in
searchAPI(for: query)
}
```

Only the results for "swift" will be emitted, as earlier queries are cancelled.

### Location-Based Data

```swift
let locationUpdates = CLLocationManager.shared.locationUpdates

let nearbyPlaces = locationUpdates.flatMapLatest { location in
fetchNearbyPlaces(at: location)
}
```

Each location update triggers a new search, cancelling any ongoing requests for previous locations.

### Dynamic Configuration

```swift
let settings = userSettingsStream

let data = settings.flatMapLatest { config in
loadData(with: config)
}
```

When settings change, data loading is restarted with the new configuration.

## Comparison with Other Operators

**`map`**: Transforms elements synchronously without producing sequences.

**`flatMap`** (if it existed): Would emit elements from all inner sequences concurrently, not cancelling previous ones.

**`switchToLatest`** (Combine): Equivalent operator in Combine framework - `flatMapLatest` is the `AsyncSequence` equivalent.

## Comparison with Other Libraries

**ReactiveX** ReactiveX has an [API definition of switchMap](https://reactivex.io/documentation/operators/flatmap.html) which performs the same operation for Observables.

**Combine** Combine has an [API definition of switchToLatest()](https://developer.apple.com/documentation/combine/publisher/switchtolatest()) which flattens a publisher of publishers by subscribing to the most recent one.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

@available(AsyncAlgorithms 1.0, *)
extension AsyncSequence where Self: Sendable {
/// Transforms elements into new asynchronous sequences, emitting elements
/// from the most recent inner sequence.
///
/// When a new element is emitted by this sequence, the `transform`
/// is called to produce a new inner sequence. Iteration on the
/// previous inner sequence is cancelled, and iteration begins
/// on the new one.
public func flatMapLatest<T: AsyncSequence & Sendable>(
_ transform: @escaping @Sendable (Element) -> T
) -> AsyncFlatMapLatestSequence<Self, T> {
return AsyncFlatMapLatestSequence(self, transform: transform)
}
}

@available(AsyncAlgorithms 1.0, *)
public struct AsyncFlatMapLatestSequence<Base: AsyncSequence & Sendable, Inner: AsyncSequence & Sendable>: AsyncSequence, Sendable where Base.Element: Sendable, Inner.Element: Sendable {
public typealias Element = Inner.Element

let base: Base
let transform: @Sendable (Base.Element) -> Inner

init(_ base: Base, transform: @escaping @Sendable (Base.Element) -> Inner) {
self.base = base
self.transform = transform
}

public func makeAsyncIterator() -> Iterator {
return Iterator(base: base, transform: transform)
}

public struct Iterator: AsyncIteratorProtocol, Sendable {
let storage: FlatMapLatestStorage<Base, Inner>

init(base: Base, transform: @escaping @Sendable (Base.Element) -> Inner) {
self.storage = FlatMapLatestStorage(base: base, transform: transform)
}

public func next() async throws -> Element? {
return try await storage.next()
}
}
}
Loading