@@ -28,7 +28,12 @@ public class RegistryDownloadsManager: AsyncCancellable {
2828 private let registryClient : RegistryClient
2929 private let delegate : Delegate ?
3030
31- private var pendingLookups = [ PackageIdentity: DispatchGroup] ( )
31+ struct PackageLookup : Hashable {
32+ let package : PackageIdentity
33+ let version : Version
34+ }
35+
36+ private var pendingLookups = [ PackageLookup : Task < AbsolutePath , Error > ] ( )
3237 private var pendingLookupsLock = NSLock ( )
3338
3439 public init (
@@ -44,24 +49,77 @@ public class RegistryDownloadsManager: AsyncCancellable {
4449 self . registryClient = registryClient
4550 self . delegate = delegate
4651 }
47-
52+
4853 public func lookup(
4954 package : PackageIdentity ,
5055 version: Version ,
5156 observabilityScope: ObservabilityScope ,
5257 delegateQueue: DispatchQueue ,
5358 callbackQueue: DispatchQueue
5459 ) async throws -> AbsolutePath {
55- try await withCheckedThrowingContinuation { continuation in
56- self . lookup (
57- package : package ,
58- version: version,
59- observabilityScope: observabilityScope,
60- delegateQueue: delegateQueue,
61- callbackQueue: callbackQueue,
62- completion: { continuation. resume ( with: $0) }
63- )
60+ let packageRelativePath : RelativePath
61+ let packagePath : AbsolutePath
62+
63+ packageRelativePath = try package . downloadPath ( version: version)
64+ packagePath = self . path. appending ( packageRelativePath)
65+
66+ // TODO: we can do some finger-print checking to improve the validation
67+ // already exists and valid, we can exit early
68+ if try self . fileSystem. validPackageDirectory ( packagePath) {
69+ return packagePath
6470 }
71+
72+ let lookupId = PackageLookup ( package : package , version: version)
73+ let task = await withCheckedContinuation { continuation in
74+ self . pendingLookupsLock. lock ( )
75+ defer { self . pendingLookupsLock. unlock ( ) }
76+
77+ // Check if we've already resolved/are in the process of resolving for this package.
78+ if let inFlight = self . pendingLookups [ lookupId] {
79+ continuation. resume ( returning: inFlight)
80+ } else {
81+ let lookupTask = Task {
82+ // inform delegate that we are starting to fetch
83+ // calculate if cached (for delegate call) outside queue as it may change while queue is processing
84+ let isCached = self . cachePath. map { self . fileSystem. exists ( $0. appending ( packageRelativePath) ) } ?? false
85+ delegateQueue. async { [ delegate = self . delegate] in
86+ let details = FetchDetails ( fromCache: isCached, updatedCache: false )
87+ delegate? . willFetch ( package : package , version: version, fetchDetails: details)
88+ }
89+
90+ // make sure destination is free.
91+ try ? self . fileSystem. removeFileTree ( packagePath)
92+
93+ let start = DispatchTime . now ( )
94+ do {
95+ let result = try await self . downloadAndPopulateCache (
96+ package : package ,
97+ version: version,
98+ packagePath: packagePath,
99+ observabilityScope: observabilityScope,
100+ delegateQueue: delegateQueue,
101+ callbackQueue: callbackQueue,
102+ )
103+ // inform delegate that we finished to fetch
104+ let duration = start. distance ( to: . now( ) )
105+ delegateQueue. async { [ delegate = self . delegate] in
106+ delegate? . didFetch ( package : package , version: version, result: . success( result) , duration: duration)
107+ }
108+ } catch {
109+ let duration = start. distance ( to: . now( ) )
110+ delegateQueue. async { [ delegate = self . delegate] in
111+ delegate? . didFetch ( package : package , version: version, result: . failure( error) , duration: duration)
112+ }
113+ throw error
114+ }
115+ return packagePath
116+ }
117+
118+ self . pendingLookups [ lookupId] = lookupTask
119+ continuation. resume ( returning: lookupTask)
120+ }
121+ }
122+ return try await task. value
65123 }
66124
67125 @available ( * , noasync, message: " Use the async alternative " )
@@ -73,82 +131,14 @@ public class RegistryDownloadsManager: AsyncCancellable {
73131 callbackQueue: DispatchQueue ,
74132 completion: @escaping ( Result < AbsolutePath , Error > ) -> Void
75133 ) {
76- // wrap the callback in the requested queue
77- let completion = { result in callbackQueue. async { completion ( result) } }
78-
79- let packageRelativePath : RelativePath
80- let packagePath : AbsolutePath
81-
82- do {
83- packageRelativePath = try package . downloadPath ( version: version)
84- packagePath = self . path. appending ( packageRelativePath)
85-
86- // TODO: we can do some finger-print checking to improve the validation
87- // already exists and valid, we can exit early
88- if try self . fileSystem. validPackageDirectory ( packagePath) {
89- return completion ( . success( packagePath) )
90- }
91- } catch {
92- return completion ( . failure( error) )
93- }
94-
95- // next we check if there is a pending lookup
96- self . pendingLookupsLock. lock ( )
97- if let pendingLookup = self . pendingLookups [ package ] {
98- self . pendingLookupsLock. unlock ( )
99- // chain onto the pending lookup
100- pendingLookup. notify ( queue: callbackQueue) {
101- // at this point the previous lookup should be complete and we can re-lookup
102- self . lookup (
103- package : package ,
104- version: version,
105- observabilityScope: observabilityScope,
106- delegateQueue: delegateQueue,
107- callbackQueue: callbackQueue,
108- completion: completion
109- )
110- }
111- } else {
112- // record the pending lookup
113- assert ( self . pendingLookups [ package ] == nil )
114- let group = DispatchGroup ( )
115- group. enter ( )
116- self . pendingLookups [ package ] = group
117- self . pendingLookupsLock. unlock ( )
118-
119- // inform delegate that we are starting to fetch
120- // calculate if cached (for delegate call) outside queue as it may change while queue is processing
121- let isCached = self . cachePath. map { self . fileSystem. exists ( $0. appending ( packageRelativePath) ) } ?? false
122- delegateQueue. async {
123- let details = FetchDetails ( fromCache: isCached, updatedCache: false )
124- self . delegate? . willFetch ( package : package , version: version, fetchDetails: details)
125- }
126-
127- // make sure destination is free.
128- try ? self . fileSystem. removeFileTree ( packagePath)
129-
130- let start = DispatchTime . now ( )
131- self . downloadAndPopulateCache (
134+ self . executeAsync ( completion, on: callbackQueue) {
135+ try await self . lookup (
132136 package : package ,
133137 version: version,
134- packagePath: packagePath,
135138 observabilityScope: observabilityScope,
136139 delegateQueue: delegateQueue,
137- callbackQueue: callbackQueue
138- ) { result in
139- // inform delegate that we finished to fetch
140- let duration = start. distance ( to: . now( ) )
141- delegateQueue. async {
142- self . delegate? . didFetch ( package : package , version: version, result: result, duration: duration)
143- }
144- // remove the pending lookup
145- self . pendingLookupsLock. lock ( )
146- self . pendingLookups [ package ] ? . leave ( )
147- self . pendingLookups [ package ] = nil
148- self . pendingLookupsLock. unlock ( )
149- // and done
150- completion ( result. map { _ in packagePath } )
151- }
140+ callbackQueue: callbackQueue,
141+ )
152142 }
153143 }
154144
@@ -164,15 +154,15 @@ public class RegistryDownloadsManager: AsyncCancellable {
164154 observabilityScope: ObservabilityScope ,
165155 delegateQueue: DispatchQueue ,
166156 callbackQueue: DispatchQueue ,
167- completion: @escaping @Sendable ( Result < FetchDetails , Error > ) -> Void
168- ) {
157+ ) async throws -> FetchDetails {
169158 if let cachePath {
170159 do {
171160 let relativePath = try package . downloadPath ( version: version)
172161 let cachedPackagePath = cachePath. appending ( relativePath)
173162
174163 try self . initializeCacheIfNeeded ( cachePath: cachePath)
175- try self . fileSystem. withLock ( on: cachedPackagePath, type: . exclusive) {
164+
165+ return try await self . fileSystem. withLock ( on: cachedPackagePath, type: . exclusive) {
176166 // download the package into the cache unless already exists
177167 if try self . fileSystem. validPackageDirectory ( cachedPackagePath) {
178168 // extra validation to defend from racy edge cases
@@ -182,33 +172,39 @@ public class RegistryDownloadsManager: AsyncCancellable {
182172 // copy the package from the cache into the package path.
183173 try self . fileSystem. createDirectory ( packagePath. parentDirectory, recursive: true )
184174 try self . fileSystem. copy ( from: cachedPackagePath, to: packagePath)
185- completion ( . success ( . init ( fromCache: true , updatedCache: false ) ) )
175+ return FetchDetails ( fromCache: true , updatedCache: false )
186176 } else {
187- // it is possible that we already created the directory before from failed attempts, so clear leftover data if present.
188- try ? self . fileSystem. removeFileTree ( cachedPackagePath)
189- // download the package from the registry
190- self . registryClient. downloadSourceArchive (
191- package : package ,
192- version: version,
193- destinationPath: cachedPackagePath,
194- progressHandler: updateDownloadProgress,
195- fileSystem: self . fileSystem,
196- observabilityScope: observabilityScope,
197- callbackQueue: callbackQueue
198- ) { result in
199- completion ( result. tryMap {
200- // extra validation to defend from racy edge cases
201- if self . fileSystem. exists ( packagePath) {
202- throw StringError ( " \( packagePath) already exists unexpectedly " )
203- }
204- // copy the package from the cache into the package path.
205- try self . fileSystem. createDirectory ( packagePath. parentDirectory, recursive: true )
206- try self . fileSystem. copy ( from: cachedPackagePath, to: packagePath)
207- return FetchDetails ( fromCache: true , updatedCache: true )
208- } )
177+ do {
178+ // it is possible that we already created the directory before from failed attempts, so clear leftover data if present.
179+ try ? self . fileSystem. removeFileTree ( cachedPackagePath)
180+ // download the package from the registry
181+ let _ = try await self . registryClient. downloadSourceArchive (
182+ package : package ,
183+ version: version,
184+ destinationPath: cachedPackagePath,
185+ progressHandler: updateDownloadProgress,
186+ fileSystem: self . fileSystem,
187+ observabilityScope: observabilityScope,
188+ callbackQueue: callbackQueue
189+ )
190+
191+ // extra validation to defend from racy edge cases
192+ if self . fileSystem. exists ( packagePath) {
193+ throw StringError ( " \( packagePath) already exists unexpectedly " )
194+ }
195+ // copy the package from the cache into the package path.
196+ try self . fileSystem. createDirectory ( packagePath. parentDirectory, recursive: true )
197+ try self . fileSystem. copy ( from: cachedPackagePath, to: packagePath)
198+ return FetchDetails ( fromCache: true , updatedCache: true )
199+ } catch {
200+ // Wrap this error to do a straight rethrow instead of handling it as if
201+ // the download should be made without populating the cache.
202+ throw DownloadError . passthrough ( error)
209203 }
210204 }
211205 }
206+ } catch DownloadError . passthrough( let underlyingError) {
207+ throw underlyingError
212208 } catch {
213209 // download without populating the cache in the case of an error.
214210 observabilityScope. emit (
@@ -217,52 +213,56 @@ public class RegistryDownloadsManager: AsyncCancellable {
217213 )
218214 // it is possible that we already created the directory from failed attempts, so clear leftover data if present.
219215 try ? self . fileSystem. removeFileTree ( packagePath)
220- self . registryClient. downloadSourceArchive (
216+ let _ = try await self . registryClient. downloadSourceArchive (
221217 package : package ,
222218 version: version,
223219 destinationPath: packagePath,
224220 progressHandler: updateDownloadProgress,
225221 fileSystem: self . fileSystem,
226222 observabilityScope: observabilityScope,
227223 callbackQueue: callbackQueue
228- ) { result in
229- completion ( result. map { FetchDetails ( fromCache: false , updatedCache: false ) } )
230- }
224+ )
225+ return FetchDetails ( fromCache: false , updatedCache: false )
231226 }
232227 } else {
233228 // it is possible that we already created the directory from failed attempts, so clear leftover data if present.
234229 try ? self . fileSystem. removeFileTree ( packagePath)
230+
235231 // download without populating the cache when no `cachePath` is set.
236- self . registryClient. downloadSourceArchive (
232+ let _ = try await self . registryClient. downloadSourceArchive (
237233 package : package ,
238234 version: version,
239235 destinationPath: packagePath,
240236 progressHandler: updateDownloadProgress,
241237 fileSystem: self . fileSystem,
242238 observabilityScope: observabilityScope,
243239 callbackQueue: callbackQueue
244- ) { result in
245- completion ( result. map { FetchDetails ( fromCache: false , updatedCache: false ) } )
246- }
240+ )
241+ return FetchDetails ( fromCache: false , updatedCache: false )
247242 }
248243
249244 // utility to update progress
250245
251246 @Sendable func updateDownloadProgress( downloaded: Int64 , total: Int64 ? ) {
252- delegateQueue. async {
253- self . delegate? . fetching (
247+ delegateQueue. async { [ delegate = self . delegate ] in
248+ delegate? . fetching (
254249 package : package ,
255250 version: version,
256251 bytesDownloaded: downloaded,
257252 totalBytesToDownload: total
258253 )
259254 }
260255 }
256+
257+ enum DownloadError : Error {
258+ case passthrough( Error )
259+ }
261260 }
262261
263262 public func remove( package : PackageIdentity ) throws {
264263 let relativePath = try package . downloadPath ( )
265264 let packagesPath = self . path. appending ( relativePath)
265+ self . pendingLookups. removeValue ( forPackage: package )
266266 try self . fileSystem. removeFileTree ( packagesPath)
267267 }
268268
@@ -314,10 +314,25 @@ public class RegistryDownloadsManager: AsyncCancellable {
314314 try self . fileSystem. createDirectory ( cachePath, recursive: true )
315315 }
316316 }
317+
318+ private func executeAsync< T> (
319+ _ callback: @escaping ( Result < T , Error > ) -> Void ,
320+ on queue: DispatchQueue ,
321+ _ closure: @escaping ( ) async throws -> T
322+ ) {
323+ let completion : ( Result < T , Error > ) -> Void = { result in queue. async { callback ( result) } }
324+ Task {
325+ do {
326+ completion ( . success( try await closure ( ) ) )
327+ } catch {
328+ completion ( . failure( error) )
329+ }
330+ }
331+ }
317332}
318333
319334/// Delegate to notify clients about actions being performed by RegistryManager.
320- public protocol RegistryDownloadsManagerDelegate {
335+ public protocol RegistryDownloadsManagerDelegate : Sendable {
321336 /// Called when a package is about to be fetched.
322337 func willFetch( package : PackageIdentity , version: Version , fetchDetails: RegistryDownloadsManager . FetchDetails )
323338
@@ -333,9 +348,17 @@ public protocol RegistryDownloadsManagerDelegate {
333348 func fetching( package : PackageIdentity , version: Version , bytesDownloaded: Int64 , totalBytesToDownload: Int64 ? )
334349}
335350
351+ extension Dictionary where Key == RegistryDownloadsManager . PackageLookup {
352+ fileprivate mutating func removeValue( forPackage package : PackageIdentity ) {
353+ self . keys
354+ . filter { $0. package == package }
355+ . forEach { self . removeValue ( forKey: $0) }
356+ }
357+ }
358+
336359extension RegistryDownloadsManager {
337360 /// Additional information about a fetch
338- public struct FetchDetails : Equatable {
361+ public struct FetchDetails : Equatable , Sendable {
339362 /// Indicates if the repository was fetched from the cache or from the remote.
340363 public let fromCache : Bool
341364 /// Indicates whether the repository was already present in the cache and updated or if a clean fetch was performed.
0 commit comments