Skip to content

Commit 6c0cfdd

Browse files
committed
Extract code for performing concurrent work on slices of a collection
1 parent c0f4e16 commit 6c0cfdd

File tree

2 files changed

+101
-52
lines changed

2 files changed

+101
-52
lines changed

Sources/SwiftDocC/Infrastructure/ConvertActionConverter.swift

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,10 @@ package enum ConvertActionConverter {
8383
let renderSignpostHandle = signposter.beginInterval("Render", id: signposter.makeSignpostID(), "Render \(context.knownPages.count) pages")
8484

8585
// Render all pages and gather their supplementary "digest" information if enabled.
86-
let supplementaryRenderInfo = try await withThrowingTaskGroup(of: SupplementaryRenderInformation.self) { taskGroup in
87-
let coverageFilterClosure = documentationCoverageOptions.generateFilterClosure()
88-
// Iterate over all the known pages in chunks
89-
var remaining = context.knownPages[...]
90-
91-
// Don't run more tasks in parallel than there are cores to run them
92-
let maxParallelTasks: Int = ProcessInfo.processInfo.processorCount
93-
let numberOfElementsPerTask = max(
94-
Int(Double(remaining.count) / Double(maxParallelTasks * 10) + 1),
95-
25 // An arbitrary smallest task size to avoid some concurrency overhead when there aren't that many pages is too small.
96-
)
97-
98-
func _render(referencesIn slice: consuming ArraySlice<ResolvedTopicReference>) throws -> SupplementaryRenderInformation {
86+
let coverageFilterClosure = documentationCoverageOptions.generateFilterClosure()
87+
let supplementaryRenderInfo = try await context.knownPages._concurrentPerform(
88+
taskName: "Render",
89+
batchWork: { slice in
9990
var supplementaryRenderInfo = SupplementaryRenderInformation()
10091

10192
for identifier in slice {
@@ -111,11 +102,7 @@ package enum ConvertActionConverter {
111102

112103
switch documentationCoverageOptions.level {
113104
case .detailed, .brief:
114-
let coverageEntry = try CoverageDataEntry(
115-
documentationNode: entity,
116-
renderNode: renderNode,
117-
context: context
118-
)
105+
let coverageEntry = try CoverageDataEntry(documentationNode: entity, renderNode: renderNode, context: context)
119106
if coverageFilterClosure(coverageEntry) {
120107
supplementaryRenderInfo.coverageInfo.append(coverageEntry)
121108
}
@@ -139,41 +126,15 @@ package enum ConvertActionConverter {
139126
}
140127

141128
return supplementaryRenderInfo
129+
},
130+
initialResult: SupplementaryRenderInformation(),
131+
combineResults: { accumulated, partialResult in
132+
accumulated.assets.merge(partialResult.assets, uniquingKeysWith: +)
133+
accumulated.linkSummaries.append(contentsOf: partialResult.linkSummaries)
134+
accumulated.indexingRecords.append(contentsOf: partialResult.indexingRecords)
135+
accumulated.coverageInfo.append(contentsOf: partialResult.coverageInfo)
142136
}
143-
144-
for _ in 0..<maxParallelTasks {
145-
if !remaining.isEmpty {
146-
let slice = remaining.prefix(numberOfElementsPerTask)
147-
remaining = remaining.dropFirst(numberOfElementsPerTask)
148-
149-
// Start work of one slice of the known pages
150-
taskGroup.addTask {
151-
return try _render(referencesIn: slice)
152-
}
153-
}
154-
}
155-
156-
var aggregateSupplementaryRenderInfo = SupplementaryRenderInformation()
157-
158-
for try await partialInfo in taskGroup {
159-
aggregateSupplementaryRenderInfo.assets.merge(partialInfo.assets, uniquingKeysWith: +)
160-
aggregateSupplementaryRenderInfo.linkSummaries.append(contentsOf: partialInfo.linkSummaries)
161-
aggregateSupplementaryRenderInfo.indexingRecords.append(contentsOf: partialInfo.indexingRecords)
162-
aggregateSupplementaryRenderInfo.coverageInfo.append(contentsOf: partialInfo.coverageInfo)
163-
164-
if !remaining.isEmpty {
165-
let slice = remaining.prefix(numberOfElementsPerTask)
166-
remaining = remaining.dropFirst(numberOfElementsPerTask)
167-
168-
// Start work of one slice of the known pages
169-
taskGroup.addTask {
170-
return try _render(referencesIn: slice)
171-
}
172-
}
173-
}
174-
175-
return aggregateSupplementaryRenderInfo
176-
}
137+
)
177138

178139
signposter.endInterval("Render", renderSignpostHandle)
179140

Sources/SwiftDocC/Utility/Collection+ConcurrentPerform.swift

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,91 @@ extension Collection where Index == Int, Self: SendableMetatype {
144144
return allResults.sync({ $0 })
145145
}
146146
}
147+
148+
extension Collection {
149+
/// Concurrently performs work on slices of the collection's elements, combining the partial results into a final result.
150+
///
151+
/// This method is intended as a building block that other higher-level `concurrent...` methods can be built upon.
152+
/// That said, calling code can opt to use this method directly as opposed to writing overly specific single-use helper methods.
153+
///
154+
/// - Parameters:
155+
/// - taskName: A human readable name of the tasks that the collection uses to perform this work.
156+
/// - batchWork: The concurrent work to perform on each slice of the collection's elements.
157+
/// - initialResult: The initial result to accumulate the partial results into.
158+
/// - combineResults: A closure that updates the accumulated result with a partial result from performing the work over one slice of the collection's elements.
159+
/// - Returns: The final result of accumulating all partial results, out of order, into the initial result.
160+
func _concurrentPerform<Result, PartialResult>(
161+
taskName: String? = nil,
162+
batchWork: (consuming SubSequence) throws -> PartialResult,
163+
initialResult: Result,
164+
combineResults: (inout Result, consuming PartialResult) -> Void
165+
) async throws -> Result {
166+
try await withThrowingTaskGroup(of: PartialResult.self, returning: Result.self) { taskGroup in
167+
try await withoutActuallyEscaping(batchWork) { work in
168+
try await withoutActuallyEscaping(combineResults) { combineResults in
169+
var remaining = self[...]
170+
171+
// Don't run more tasks in parallel than there are cores to run them
172+
let maxParallelTasks: Int = ProcessInfo.processInfo.processorCount
173+
// Finding the right number of tasks is a balancing act.
174+
// If the tasks are too small, then there's increased overhead from scheduling a lot of tasks and accumulating their results.
175+
// If the tasks are too large, then there's a risk that some tasks take longer to complete than others, increasing the amount of idle time.
176+
//
177+
// Here, we aim to schedule at most 10 tasks per core but create fewer tasks if the collection is fairly small to avoid some concurrent overhead.
178+
// The table below shows the approximate number of tasks per CPU core and the number of elements per task, within parenthesis,
179+
// for different collection sizes and number of CPU cores, given a minimum task size of 20 elements:
180+
//
181+
// | 500 | 1000 | 2500 | 5000 | 10000 | 25000
182+
// ----------|------------|------------|------------|------------|-------------|-------------
183+
// 8 cores | ~3,2 (20) | ~6,3 (20) | ~9,8 (32) | ~9,9 (63) | ~9,9 (126) | ~9,9 (313)
184+
// 12 cores | ~2,1 (20) | ~4,2 (20) | ~10,0 (21) | ~10,0 (42) | ~10,0 (84) | ~10,0 (209)
185+
// 16 cores | ~1,6 (20) | ~3,2 (20) | ~7,9 (20) | ~9,8 (32) | ~9,9 (63) | ~10,0 (157)
186+
// 32 cores | ~0,8 (20) | ~1,6 (20) | ~4,0 (20) | ~7,9 (20) | ~9,8 (32) | ~9,9 (79)
187+
//
188+
let numberOfElementsPerTask: Int = Swift.max(
189+
Int(Double(remaining.count) / Double(maxParallelTasks * 10) + 1),
190+
20 // (this is a completely arbitrary task size threshold)
191+
)
192+
193+
// Start the first round of work.
194+
// If the collection is big, this will add one task per core.
195+
// If the collection is small, this will only add a few tasks.
196+
for _ in 0..<maxParallelTasks {
197+
if !remaining.isEmpty {
198+
let slice = remaining.prefix(numberOfElementsPerTask)
199+
remaining = remaining.dropFirst(numberOfElementsPerTask)
200+
201+
// Start work of one slice of the known pages
202+
taskGroup.addTask(name: taskName) {
203+
return try work(slice)
204+
}
205+
}
206+
}
207+
208+
var result = initialResult
209+
210+
for try await partialResult in taskGroup {
211+
// Check if the larger task group has been cancelled and if so, avoid doing any further work.
212+
try Task.checkCancellation()
213+
214+
combineResults(&result, partialResult)
215+
216+
// Now that one task has finished, and one core is available for work,
217+
// see if we have more slices to process and add one more task to process that slice.
218+
if !remaining.isEmpty {
219+
let slice = remaining.prefix(numberOfElementsPerTask)
220+
remaining = remaining.dropFirst(numberOfElementsPerTask)
221+
222+
// Start work of one slice of the known pages
223+
taskGroup.addTask(name: taskName) {
224+
return try work(slice)
225+
}
226+
}
227+
}
228+
229+
return result
230+
}
231+
}
232+
}
233+
}
234+
}

0 commit comments

Comments
 (0)