2023 Developer ToolsSwift
WWDC23 · 24 min · Developer Tools / Swift
Beyond the basics of structured concurrency
It’s all about the task tree: Find out how structured concurrency can help your apps manage automatic task cancellation, task priority propagation, and useful task-local value patterns. Learn how to manage resources in your app with useful patterns and the latest task group APIs. We’ll show you how you can leverage the power of the task tree and task-local values to gain insight into distributed systems. Before watching, review the basics of Swift Concurrency and structured concurrency by checking out “Swift concurrency: Behind the scenes” and “Explore structured concurrency in Swift” from WWDC21.
Watch at developer.apple.com ↗Chapters
- 0:56 — Structured concurrency
- 3:11 — Task tree
- 3:44 — Task cancellation
- 5:26 — withTaskCancellationHandler
- 8:36 — Task priority
- 10:23 — Patterns with task groups
- 11:27 — Limiting concurrent tasks in TaskGroups
- 12:22 — DiscardingTaskGroup
- 13:53 — Task-local values
- 16:58 — swift-log
- 17:19 — MetadataProvider
- 18:58 — Task traces
- 19:46 — Swift-Distributed-Tracing
- 20:42 — Instrumenting distributed computations
- 23:38 — Wrap-up
Code shown on screen · 21 snippets
Unstructured concurrency
func makeSoup(order: Order) async throws -> Soup {
let boilingPot = Task { try await stove.boilBroth() }
let choppedIngredients = Task { try await chopIngredients(order.ingredients) }
let meat = Task { await marinate(meat: .chicken) }
let soup = await Soup(meat: meat.value, ingredients: choppedIngredients.value)
return await stove.cook(pot: boilingPot.value, soup: soup, duration: .minutes(10))
} Structured concurrency
func makeSoup(order: Order) async throws -> Soup {
async let pot = stove.boilBroth()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
} Structured concurrency
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
for ingredient in ingredients {
group.addTask { await chop(ingredient) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for await choppedIngredient in group {
if choppedIngredient != nil {
choppedIngredients.append(choppedIngredient!)
}
}
return choppedIngredients
}
} Task cancellation
func makeSoup(order: Order) async throws -> Soup {
async let pot = stove.boilBroth()
guard !Task.isCancelled else {
throw SoupCancellationError()
}
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
} Task cancellation
func chopIngredients(_ ingredients: [any Ingredient]) async throws -> [any ChoppedIngredient] {
return try await withThrowingTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
try Task.checkCancellation()
// Concurrently chop ingredients
for ingredient in ingredients {
group.addTask { await chop(ingredient) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for try await choppedIngredient in group {
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
} Cancellation and async sequences
actor Cook {
func handleShift<Orders>(orders: Orders) async throws
where Orders: AsyncSequence,
Orders.Element == Order {
for try await order in orders {
let soup = try await makeSoup(order)
// ...
}
}
} Cancellation and async sequences
public func next() async -> Order? {
return await withTaskCancellationHandler {
let result = await kitchen.generateOrder()
guard state.isRunning else {
return nil
}
return result
} onCancel: {
state.cancel()
}
} AsyncSequence state machine
private final class OrderState: Sendable {
let protectedIsRunning = ManagedAtomic<Bool>(true)
var isRunning: Bool {
get { protectedIsRunning.load(ordering: .acquiring) }
set { protectedIsRunning.store(newValue, ordering: .relaxed) }
}
func cancel() { isRunning = false }
} Limiting concurrency with TaskGroups
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
for ingredient in ingredients {
group.addTask { await chop(ingredient) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for await choppedIngredient in group {
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
} Limiting concurrency with TaskGroups
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
let maxChopTasks = min(3, ingredients.count)
for ingredientIndex in 0..<maxChopTasks {
group.addTask { await chop(ingredients[ingredientIndex]) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for await choppedIngredient in group {
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
} Limiting concurrency with TaskGroups
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
let maxChopTasks = min(3, ingredients.count)
for ingredientIndex in 0..<maxChopTasks {
group.addTask { await chop(ingredients[ingredientIndex]) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
var nextIngredientIndex = maxChopTasks
for await choppedIngredient in group {
if nextIngredientIndex < ingredients.count {
group.addTask { await chop(ingredients[nextIngredientIndex]) }
nextIngredientIndex += 1
}
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
} Limiting concurrency with TaskGroups
withTaskGroup(of: Something.self) { group in
for _ in 0..<maxConcurrentTasks {
group.addTask { }
}
while let <partial result> = await group.next() {
if !shouldStop {
group.addTask { }
}
}
} Kitchen Service
func run() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
for cook in staff.keys {
group.addTask { try await cook.handleShift() }
}
group.addTask {
// keep the restaurant going until closing time
try await Task.sleep(for: shiftDuration)
}
try await group.next()
// cancel all ongoing shifts
group.cancelAll()
}
} Introducing DiscardingTaskGroups
func run() async throws {
try await withThrowingDiscardingTaskGroup { group in
for cook in staff.keys {
group.addTask { try await cook.handleShift() }
}
group.addTask { // keep the restaurant going until closing time
try await Task.sleep(for: shiftDuration)
throw TimeToCloseError()
}
}
} TaskLocal values
actor Kitchen {
static var orderID: Int?
static var cook: String?
func logStatus() {
print("Current cook: \(Kitchen.cook ?? "none")")
}
}
let kitchen = Kitchen()
await kitchen.logStatus()
await Kitchen.$cook.withValue("Sakura") {
await kitchen.logStatus()
}
await kitchen.logStatus() Logging
func makeSoup(order: Order) async throws -> Soup {
log.debug("Preparing dinner", [
"cook": "\(self.name)",
"order-id": "\(order.id)",
"vegetable": "\(vegetable)",
])
// ...
}
func chopVegetables(order: Order) async throws -> [Vegetable] {
log.debug("Chopping ingredients", [
"cook": "\(self.name)",
"order-id": "\(order.id)",
"vegetable": "\(vegetable)",
])
async let choppedCarrot = try chop(.carrot)
async let choppedPotato = try chop(.potato)
return try await [choppedCarrot, choppedPotato]
}
func chop(_ vegetable: Vegetable, order: Order) async throws -> Vegetable {
log.debug("Chopping vegetable", [
"cook": "\(self.name)",
"order-id": "\(order)",
"vegetable": "\(vegetable)",
])
// ...
} MetadataProvider in action
let orderMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let orderID = Kitchen.orderID {
metadata["orderID"] = "\(orderID)"
}
return metadata
} MetadataProvider in action
let orderMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let orderID = Kitchen.orderID {
metadata["orderID"] = "\(orderID)"
}
return metadata
}
let chefMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let chef = Kitchen.chef {
metadata["chef"] = "\(chef)"
}
return metadata
}
let metadataProvider = Logger.MetadataProvider.multiplex([orderMetadataProvider,
chefMetadataProvider])
LoggingSystem.bootstrap(StreamLogHandler.standardOutput, metadataProvider: metadataProvider)
let logger = Logger(label: "KitchenService") Logging with metadata providers
func makeSoup(order: Order) async throws -> Soup {
logger.info("Preparing soup order")
async let pot = stove.boilBroth()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
} Profile server-side execution
func makeSoup(order: Order) async throws -> Soup {
try await withSpan("makeSoup(\(order.id)") { span in
async let pot = stove.boilWater()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
} Profiling server-side execution
func makeSoup(order: Order) async throws -> Soup {
try await withSpan(#function) { span in
span.attributes["kitchen.order.id"] = order.id
async let pot = stove.boilWater()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
} Resources
Related sessions
-
43 min -
25 min -
28 min -
34 min -
39 min