@@ -514,15 +514,18 @@ package final class AsyncProcess {
514514 if self . outputRedirection. redirectsOutput {
515515 let stdoutPipe = Pipe ( )
516516 let stderrPipe = Pipe ( )
517+ let stdoutStream = DispatchFD ( fileHandle: stdoutPipe. fileHandleForReading) . dataStream ( )
518+ let stderrStream = DispatchFD ( fileHandle: stderrPipe. fileHandleForReading) . dataStream ( )
517519
518520 group. enter ( )
519- stdoutPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
520- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
521- if data. count == 0 {
522- stdoutPipe. fileHandleForReading. readabilityHandler = nil
521+ Task {
522+ defer {
523+ print ( " --- finished consuming stdout --- " )
523524 group. leave ( )
524- } else {
525- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
525+ }
526+ print ( " --- started consuming stdout --- " )
527+ for try await data in stdoutStream {
528+ let contents = [ UInt8] ( data)
526529 self . outputRedirection. outputClosures? . stdoutClosure ( contents)
527530 stdoutLock. withLock {
528531 stdout += contents
@@ -531,13 +534,14 @@ package final class AsyncProcess {
531534 }
532535
533536 group. enter ( )
534- stderrPipe. fileHandleForReading. readabilityHandler = { ( fh: FileHandle ) in
535- let data = ( try ? fh. read ( upToCount: Int . max) ) ?? Data ( )
536- if data. count == 0 {
537- stderrPipe. fileHandleForReading. readabilityHandler = nil
537+ Task {
538+ defer {
539+ print ( " --- finished consuming stderr --- " )
538540 group. leave ( )
539- } else {
540- let contents = data. withUnsafeBytes { [ UInt8] ( $0) }
541+ }
542+ print ( " --- started consuming stderr --- " )
543+ for try await data in stderrStream {
544+ let contents = [ UInt8] ( data)
541545 self . outputRedirection. outputClosures? . stderrClosure ( contents)
542546 stderrLock. withLock {
543547 stderr += contents
@@ -557,6 +561,7 @@ package final class AsyncProcess {
557561 }
558562
559563 group. notify ( queue: self . completionQueue) {
564+ print ( " --- notified that output is ready --- " )
560565 self . stateLock. withLock {
561566 self . state = . outputReady( stdout: . success( stdout) , stderr: . success( stderr) )
562567 }
@@ -820,6 +825,7 @@ package final class AsyncProcess {
820825 /// Executes the process I/O state machine, calling completion block when finished.
821826 private func waitUntilExit( _ completion: @escaping ( Result < AsyncProcessResult , Swift . Error > ) -> Void ) {
822827 self . stateLock. lock ( )
828+ print ( " --- waitUntilExit called: \( self . state) --- " )
823829 switch self . state {
824830 case . idle:
825831 defer { self . stateLock. unlock ( ) }
@@ -832,16 +838,19 @@ package final class AsyncProcess {
832838 completion ( . failure( error) )
833839 case . readingOutput( let sync) :
834840 self . stateLock. unlock ( )
841+ print ( " --- queing up waitUntilExit block --- " )
835842 sync. notify ( queue: self . completionQueue) {
843+ print ( " --- was notified we should enter waitUntilExit again --- " )
836844 self . waitUntilExit ( completion)
837845 }
838846 case . outputReady( let stdoutResult, let stderrResult) :
839- defer { self . stateLock. unlock ( ) }
840847 // Wait until process finishes execution.
841848 #if os(Windows)
842849 precondition ( self . _process != nil , " The process is not yet launched. " )
843850 let p = self . _process!
851+ self . stateLock. unlock ( )
844852 p. waitUntilExit ( )
853+ self . stateLock. lock ( )
845854 let exitStatusCode = p. terminationStatus
846855 let normalExit = p. terminationReason == . exit
847856 #else
@@ -866,6 +875,7 @@ package final class AsyncProcess {
866875 stderrOutput: stderrResult
867876 )
868877 self . state = . complete( executionResult)
878+ self . stateLock. unlock ( )
869879 self . completionQueue. async {
870880 self . waitUntilExit ( completion)
871881 }
@@ -1354,3 +1364,51 @@ extension FileHandle: WritableByteStream {
13541364 }
13551365}
13561366#endif
1367+
1368+ extension DispatchFD {
1369+ public func readChunk( upToLength maxLength: Int ) async throws -> DispatchData {
1370+ return try await withCheckedThrowingContinuation { continuation in
1371+ DispatchIO . read ( fromFileDescriptor: numericCast ( self . rawValue) , maxLength: maxLength, runningHandlerOn: DispatchQueue . global ( ) )
1372+ { data, error in
1373+ if error != 0 {
1374+ continuation. resume ( throwing: StringError ( " POSIX error: \( error) " ) )
1375+ return
1376+ }
1377+ continuation. resume ( returning: data)
1378+ }
1379+ }
1380+
1381+ }
1382+
1383+ /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
1384+ @available ( macOS 15 . 0 , iOS 18 . 0 , tvOS 18 . 0 , watchOS 11 . 0 , visionOS 2 . 0 , * )
1385+ public func dataStream( ) -> some AsyncSequence < DispatchData , any Error > {
1386+ AsyncThrowingStream < DispatchData , any Error > {
1387+ while !Task. isCancelled {
1388+ let chunk = try await readChunk ( upToLength: 4096 )
1389+ if chunk. isEmpty {
1390+ return nil
1391+ }
1392+ return chunk
1393+ }
1394+ throw CancellationError ( )
1395+ }
1396+ }
1397+ }
1398+
1399+ public struct DispatchFD {
1400+ #if os(Windows)
1401+ fileprivate let rawValue : Int
1402+ #else
1403+ fileprivate let rawValue : Int32
1404+ #endif
1405+
1406+ init ( fileHandle: FileHandle ) {
1407+ #if os(Windows)
1408+ // This may look unsafe, but is how swift-corelibs-dispatch works. Basically, dispatch_fd_t directly represents either a POSIX file descriptor OR a Windows HANDLE pointer address, meaning that the fileDescriptor parameter of various Dispatch APIs is actually NOT a file descriptor on Windows but rather a HANDLE. This means that the handle should NOT be converted using _open_osfhandle, and the return value of this function should ONLY be passed to Dispatch functions where the fileDescriptor parameter is masquerading as a HANDLE in this manner. Use with extreme caution.
1409+ rawValue = . init( bitPattern: fileHandle. _handle)
1410+ #else
1411+ rawValue = fileHandle. fileDescriptor
1412+ #endif
1413+ }
1414+ }
0 commit comments