I’ll focus on Combine and Swift concurrency for one specific use case: a potentially infinite sequence of events that could overwhelm the consumer, or at best, merely waste resources by generating excessive values that the consumer might not even require.

The values from these sequences cannot be dropped using debounce or throttle operators, as is typically done with certain UI events, thus, the need for proper backpressure support.

Subscription and Task cancellation


An important aspect from an ergonomic standpoint is the lifecycle management of an entity that could potentially emit an infinite number of values.

Combine’s FRP pipelines are designed to be strongly held by the parent entity (ViewModel/ViewController) in a collection of AnyCancellables, so their lifetimes can be closely tied to that of the parent. This approach works fine by using .store(in: &cancellables)

Out of the box, Swift Tasks can be somewhat more awkward in that regard. However, by extending the Task type to provide an AnyCancelable, the Task lifetime can be easily tied to the parent in the same way as with Combine’s Subscription/Subscriber lifetime management using the AnyCancellable type.

One major caveat is that you are always responsible for inserting try Task.checkCancellation() checks within the task’s body, or in this case, inside the async loop that awaits events from an async sequence. The more frequently you check for cancellation, the better? It is certainly good practice to check for cancellation periodically within intense loops or before initiating other large tasks.

extension Task {
    func store(in cancellables: inout Set<AnyCancellable>) {
        AnyCancellable { self.cancel() }
            .store(in: &cancellables)
    }
}

class ViewModel {
    private var cancellables = Set<AnyCancellable>()

    func doAsyncWork() {
        Task {
            for await value in someAsyncSequence {}
                try Task.checkCancellation()
                // do something with the value
            }
        }
        .store(in: &cancellables)
    }
}

Using AsyncSequences across API boundaries


A more cumbersome use case involves a Swift package (or really any ) with a function that returns an AsyncSequence, which internally performs some transformations and merges with several other sequences. It’s easy to end up with silly type signatures like these:

AsyncMerge2Sequence<CustomAsyncSequenceA, AsyncMapSequence<CustomAsyncSequenceB, String>>

Examining the standard library tools, one might be consider to wrap that mess into a neat AsyncStream<Element> to hide the internal details. However, doing so would be a misuse of AsyncStream, which is designed to be used as a bridge between non-concurrent and concurrent contexts.

The main drawback being that AsyncStream does not support backpressure, it will readily buffer values for later consumption without any limitations. An alternative that supports backpressure is the AsyncChannel type, implemented in the swift-async-algorithms package. This type is specifically designed to facilitate communication between two concurrent tasks. However, it does introduce just little tiny bit of overhead.

The most ergonomic approach would be for Task have a eraseToAnyAsyncSequence() function similar to what Combine does with it’s Publishers, but this function is not available out of the box for the Task type. Fortunately, there are packages that introduce this functionality with relatively little overhead.

More on this topic on swift forums:

Backpressure support


In this section, I’ll ignore the debounce and throttle operators. Both are present in Combine and in the swift-async-algorithms package for Swift concurrency. However, for the use case I’m focusing on, they are unsuitable because they allow the producer to run and waste resources while the consumer is busy with something else.

Both Combine and Swift concurrency can handle backpressure, but I have to give credit to Swift Concurrency for its simple and intuitive built-in support for managing potentially infinite async sequences.

A parent task awaiting values from an AsyncSequence will not get overwhelmed, as the child task producing the values for the AsyncSequence is suspended after generating a new value, waiting for the parent task to process the value and perform any additional necessary actions.

Task {
    for await value in infiniteSequence {
        // infiniteSequence is suspended while the parent
        // `doesSomething` with the value
        await doSomething(with: value)
    }
}

Can’t really say the same about Combine’s backpressure handling, The Subscription entity that’s responsible to produce new values are basing their rate of production on the explicit demand of the Subscriber entity. Thus seting up propperly a pipeline to handle backpressure would require a custom Subscriber type similar to this one.

class CountSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never
    var subscription: Subscription?

    func receive(subscription: Subscription) {
        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("got value \(input)")
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            self.subscription?.request(.max(1))
        }
        
        return Subscribers.Demand.none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        // ...
    }
}

Handling backpressure in Combine is not the most ergonomic approach, so developers often don’t bother with it. Instead, they typically use throttle and debounce operators to manage UI events when necessary.

Conclusion


From an ergonomic standpoint, I like the look and feel of Swift concurrency. Some features are missing at the time of writing, like the inability to have multiple Tasks consume the same AsyncSequence in the same way Combine can do with the share() and multicast(_:) operators, not dealbreakers in most cases.

Some missing features can be addressed by using the Combine framework in combination with the Swift concurrency, since macOS 12.0 and iOS 15.0, all Combine publishers offer this public var values: AsyncPublisher<Self> that implements the AsyncSequence protocol.