Extending Combine with a custom ShareReplay operator


January 29, 2020

#swift #combine

Combine is a functional reactive programming framework introduced by Apple on WWDC 2019. It was most probably inspired by ReactiveX and its implementations like RxSwift, RxJava, etc. There is even a cheatsheet you can use to compare Combine and RxSwift abstractions and find similar core components, operators, etc. It turns out that for some of the operators defined in ReactiveX there are no counterparts implemented in Combine.

In today’s article, I will share a way to extend the Combine framework with a custom ReplaySubject and ShareReplay operator.

ShareReplay operator

The ShareReplay is used to share a single subscription to the upstream publisher and replay items emitted by that one. It can be described with the next marble diagram:

ShareReplay Marble diagram

This operator could be useful when you’re doing an expensive operation like a network request. Most of the times there is no need to perform it for each subscriber. The better solution would be to execute it once, then cache and multicast the results.

Defining the ShareReplay operator

Let’s start with defining the ShareReplay operator. This could be done by creating an extension of the Publisher protocol:

extension Publisher {
    /// Provides a subject that shares a single subscription to the upstream publisher and
    /// replays at most `bufferSize` items emitted by that publisher
    /// - Parameter bufferSize: limits the number of items that can be replayed
    func shareReplay(_ bufferSize: Int) -> AnyPublisher<Output, Failure> {
        return multicast(subject: ReplaySubject(maxValues: bufferSize)).autoconnect().eraseToAnyPublisher()
    }
}

As you can see above, we are using multicast operator with a ReplaySubject instance. The subject is used to deliver elements to multiple downstream subscribers. The operator returns a connectable publisher and autoconnect() is used to simplify connecting or disconnecting from one.

It is important to note that any data flow within Combine framework involves three main components:

  • Publisher - describes how values and errors are produced over time.
  • Subscriber - registers on a Publisher to receive values, including completion.
  • Subscription - controls the flow of data from a Publisher to a Subscriber.

In our case we need to define ReplaySubject and ReplaySubjectSubscription classes that implement Subject and Subscription protocols accordingly.

Creating the ReplaySubject

Now we need to create the ReplaySubject class. It consumes the bufferSize via initializer to limit the number of items to replay. We’ll be using the buffer field to cache output values and NSRecursiveLock instance to make the class thread-safe.

final class ReplaySubject<Output, Failure: Error>: Subject {
    private var buffer = [Output]()
    private let bufferSize: Int
    private let lock = NSRecursiveLock()

    init(_ bufferSize: Int = 0) {
        self.bufferSize = bufferSize
    }
}

Subject is a publisher that allows outside callers to publish elements. To satisfy the Publisher protocol requirements we should implement the receive function that is called to attach a Subscriber to the Publisher.

extension ReplaySubject {
    private var subscriptions = [ReplaySubjectSubscription<Output, Failure>]()
    private var completion: Subscribers.Completion<Failure>?

    func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Failure == Failure, Downstream.Input == Output {
        lock.lock(); defer { lock.unlock() } ➊
        let subscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber))
        subscriber.receive(subscription: subscription) ➋
        subscriptions.append(subscription)
        subscription.replay(buffer, completion: completion) ➌
    }
}

➊ Protects the critical section of code with a lock.
➋ Creates the ReplaySubjectSubscription instance and sends it to the subscriber.
➌ Replays the buffered values and completion event for the current subscription.

Now it is time to define functions from the Subject protocol. These are used to establish demand for a new upstream subscription and propagate a value or completion event to the subscriber:

extension ReplaySubject {

    /// Establishes demand for a new upstream subscriptions
    func send(subscription: Subscription) {
        lock.lock(); defer { lock.unlock() }
        subscription.request(.unlimited)
    }

    /// Sends a value to the subscriber.
    func send(_ value: Output) {
        lock.lock(); defer { lock.unlock() }
        buffer.append(value)
        buffer = buffer.suffix(bufferSize)
        subscriptions.forEach { $0.receive(value) }
    }

    /// Sends a completion event to the subscriber.
    func send(completion: Subscribers.Completion<Failure>) {
        lock.lock(); defer { lock.unlock() }
        self.completion = completion
        subscriptions.forEach { subscription in subscription.receive(completion: completion) }
    }
}

Implementing a custom Subscription

Next we should implement ReplaySubjectSubscription class, that conforms to Subscription protocol. It could be defined as:

final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription {
    private let downstream: AnySubscriber<Output, Failure>
    private var isCompleted = false
    private var demand: Subscribers.Demand = .none

    init(downstream: AnySubscriber<Output, Failure>) {
        self.downstream = downstream
    }

    func request(_ newDemand: Subscribers.Demand) {
        demand += newDemand
    }

    func cancel() {
        isCompleted = true
    }

    func receive(_ value: Output) {
        guard !isCompleted, demand > 0 else { return }

        demand += downstream.receive(value)
        demand -= 1
    }

    func receive(completion: Subscribers.Completion<Failure>) {
        guard !isCompleted else { return }
        isCompleted = true
        downstream.receive(completion: completion)
    }

    func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) {
        guard !isCompleted else { return }
        values.forEach { value in receive(value) }
        if let completion = completion { receive(completion: completion) }
    }
}

As you can see, the implementation is quite straight-forward. Most of the functions are used to send the values and completion event to the downstream subscriber. We are using isCompleted flag to check if subscription is finished or cancelled. Apart from that we keep track of subscriber’s demand, that indicates how many more elements the subscriber expects to receive.

Using the ShareReplay operator in practice

Just like that, we’ve extended Combine framework with a custom ShareReplay operator. As I mentioned earlier we can use one to cache the result of a network request and multicast it to multiple subscribers:

func getUsers(_ url: URL) -> AnyPublisher<Result<[User], Error>, Never> {
    return URLSession.shared
        .dataTaskPublisher(for: url)
        .map { $0.data }
        .decode(type: [User].self, decoder: JSONDecoder())
        .map { users in .success(users) }
        .catch { error in return Just(.failure(error)) }
        .subscribe(on: DispatchQueue(label: "networking"))
        .receive(on: RunLoop.main)
        .shareReplay(1)
        .eraseToAnyPublisher()
}

let users = getUsers(URL(string: "https://jsonplaceholder.typicode.com/users")!)

let sub1 = users.sink(receiveValue: { value in
    print("subscriber1: \(value)\n")
})

let sub2 = users.sink(receiveValue: { value in
    print("subscriber2: \(value)\n")
})

Conclusion

In this article we’ve learned the way to define a custom ShareReplay operator, that could be helpful in reducing duplicated work and enhancing user experience. If you’d like to dive deeply into Combine, I’d suggest having a look at:

  • Using Combine - an intermediate to advanced book, that goes into details on how to use the Combine framework provided by Apple.
  • OpenCombine - Open-source implementation of Apple’s Combine framework.

You can find the source code of everything described in this blog post on Github. Feel free to play around and reach me out on Twitter if you have any questions, suggestions or feedback.

Thanks for reading!