Make tasks in Swift concurrency run serially

Obviously if your tasks do not have any await or other suspension points, you would just use an actor, and not make the method async, and it automatically will perform them sequentially.

But, when dealing with asynchronous actor methods, one must appreciate that actors are reentrant (see SE-0306: Actors – Actor Reentrancy). If you really are trying to a series of asynchronous tasks run serially, you will want to manually have each subsequent task await the prior one. E.g.,

actor Foo {
    private var previousTask: Task<(), Error>?

    func add(block: @Sendable @escaping () async throws -> Void) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result

            return try await block()
        }
    }
}

There are two subtle aspects to the above:

  1. I use the capture list of [previousTask] to make sure to get a copy of the prior task.

  2. I perform await previousTask?.value inside the new task, not before it.

    If you await prior to creating the new task, you have race, where if you launch three tasks, both the second and the third will await the first task, i.e. the third task is not awaiting the second one.

And, perhaps needless to say, because this is within an actor, it avoids the need for detached task, while keeping the main thread free.

enter image description here


Note, asynchronous sequences (e.g., AsyncSequence protocol, or AsyncStream which simplifies the creation of a sequence) can also give you serial behavior.

Or, AsyncChannel from Swift Async Algorithms is another great way to deal with a pipeline of requests triggering a serial execution of some block of code.

E.g., here is a serial download manager using AsyncChannel and a simple forawaitin loop to achieve serial behavior:

actor SerialDownloadManager {
    static let shared = SerialDownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()

    private init() {
        Task { try await startDownloader() }
    }

    // this sends URLs on the channel

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension SerialDownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        // this consumes the URLs on the channel

        for await url in urls {
            // if you want to observe in "points of interest"
            //
            // let id = OSSignpostID(log: poi)
            // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
            // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

            // download

            let (location, response) = try await self.session.download(from: url, delegate: nil)

            if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                let destination = folder.appending(component: url.lastPathComponent)
                try? FileManager.default.removeItem(at: destination)
                try FileManager.default.moveItem(at: location, to: destination)
            }
        }
    }
}

Then you can do things like:

func appendUrls() async {
    for i in 0 ..< 10 {
        await SerialDownloadManager.shared.append(baseUrl.appending(component: "\(i).jpg"))
    }
}

Yielding:

enter image description here

Or, if you want, you can allow for constrained concurrency with a task group, e.g., doing 4 at a time here:

actor DownloadManager {
    static let shared = DownloadManager()

    private let session: URLSession = …
    private let urls = AsyncChannel<URL>()
    private var count = 0
    private let maxConcurrency = 4       // change to 1 for serial downloads, but 4-6 is a good balance between benefits of concurrency, but not overtaxing server

    private init() {
        Task {
            do {
                try await startDownloader()
            } catch {
                logger.error("\(error, privacy: .public)")
            }
        }
    }

    func append(_ url: URL) async {
        await urls.send(url)
    }
}

private extension DownloadManager {
    func startDownloader() async throws {
        let folder = try FileManager.default
            .url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
            .appending(component: "downloads")

        try? FileManager.default.createDirectory(at: folder, withIntermediateDirectories: true)

        try await withThrowingTaskGroup(of: Void.self) { group in
            for await url in urls {
                count += 1
                if count > maxConcurrency { try await group.next() }

                group.addTask {
                    // if you want to observe in "points of interest"
                    //
                    // let id = OSSignpostID(log: poi)
                    // os_signpost(.begin, log: poi, name: "Download", signpostID: id, "%{public}@", url.lastPathComponent)
                    // defer { os_signpost(.end, log: poi, name: "Download", signpostID: id) }

                    // download

                    let (location, response) = try await self.session.download(from: url, delegate: nil)

                    if let response = response as? HTTPURLResponse, 200 ..< 300 ~= response.statusCode {
                        let destination = folder.appending(component: url.lastPathComponent)
                        try? FileManager.default.removeItem(at: destination)
                        try FileManager.default.moveItem(at: location, to: destination)
                    }
                }
            }

            try await group.waitForAll()
        }
    }
}

Yielding:

enter image description here

For more information on asynchronous sequences, in general, see WWDC 2021 video Meet AsyncSequence.

Leave a Comment