Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 196 additions & 110 deletions Sources/SPMBuildCore/Plugins/DefaultPluginScriptRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable {

return sdkRootPath
}

/// Private function that invokes a compiled plugin executable and communicates with it until it finishes.
fileprivate func invoke(
compiledExec: Basics.AbsolutePath,
Expand All @@ -433,30 +433,47 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable {
delegate: PluginScriptRunnerDelegate,
completion: @escaping (Result<Int32, Error>) -> Void
) {
#if canImport(Darwin) && !os(macOS)
callbackQueue.async {
completion(.failure(DefaultPluginScriptRunnerError.pluginUnavailable(reason: "subprocess invocations are unavailable on this platform")))
Task<Void, Never> {
let result: Result<Int32, any Error>
do {
result = try await .success(invoke(compiledExec: compiledExec, workingDirectory: workingDirectory, writableDirectories: writableDirectories, readOnlyDirectories: readOnlyDirectories, allowNetworkConnections: allowNetworkConnections, initialMessage: initialMessage, observabilityScope: observabilityScope, callbackQueue: callbackQueue, delegate: delegate))
} catch {
result = .failure(error)
}
callbackQueue.async {
completion(result)
}
}
}

/// Private function that invokes a compiled plugin executable and communicates with it until it finishes.
fileprivate func invoke(
compiledExec: Basics.AbsolutePath,
workingDirectory: Basics.AbsolutePath,
writableDirectories: [Basics.AbsolutePath],
readOnlyDirectories: [Basics.AbsolutePath],
allowNetworkConnections: [SandboxNetworkPermission],
initialMessage: Data,
observabilityScope: ObservabilityScope,
callbackQueue: DispatchQueue,
delegate: PluginScriptRunnerDelegate
) async throws -> Int32 {
#if canImport(Darwin) && !os(macOS)
throw DefaultPluginScriptRunnerError.pluginUnavailable(reason: "subprocess invocations are unavailable on this platform")
#else
// Construct the command line. Currently we just invoke the executable built from the plugin without any parameters.
var command = [compiledExec.pathString]

// Optionally wrap the command in a sandbox, which places some limits on what it can do. In particular, it blocks network access and restricts the paths to which the plugin can make file system changes. It does allow writing to temporary directories.
if self.enableSandbox {
do {
command = try Sandbox.apply(
command: command,
fileSystem: self.fileSystem,
strictness: .writableTemporaryDirectory,
writableDirectories: writableDirectories + [self.cacheDir],
readOnlyDirectories: readOnlyDirectories,
allowNetworkConnections: allowNetworkConnections
)
} catch {
return callbackQueue.async {
completion(.failure(error))
}
}
command = try Sandbox.apply(
command: command,
fileSystem: self.fileSystem,
strictness: .writableTemporaryDirectory,
writableDirectories: writableDirectories + [self.cacheDir],
readOnlyDirectories: readOnlyDirectories,
allowNetworkConnections: allowNetworkConnections
)
}

// Create and configure a Process. We set the working directory to the cache directory, so that relative paths end up there.
Expand Down Expand Up @@ -484,121 +501,88 @@ public struct DefaultPluginScriptRunner: PluginScriptRunner, Cancellable {
// Set up a pipe for sending structured messages to the plugin on its stdin.
let stdinPipe = Pipe()
let outputHandle = stdinPipe.fileHandleForWriting
defer {
// Close the output handle through which we talked to the plugin.
try? outputHandle.close()
}
let outputQueue = DispatchQueue(label: "plugin-send-queue")
process.standardInput = stdinPipe

// Set up a pipe for receiving messages from the plugin on its stdout.
let stdoutPipe = Pipe()
let stdoutLock = NSLock()
stdoutPipe.fileHandleForReading.readabilityHandler = { fileHandle in
// Receive the next message and pass it on to the delegate.
stdoutLock.withLock {
do {
while let message = try fileHandle.readPluginMessage() {
// FIXME: We should handle errors here.
callbackQueue.async {
do {
try delegate.handleMessage(data: message, responder: { data in
outputQueue.async {
do {
try outputHandle.writePluginMessage(data)
}
catch {
print("error while trying to send message to plugin: \(error.interpolationDescription)")
}
}
})
}
catch DecodingError.keyNotFound(let key, _) where key.stringValue == "version" {
print("message from plugin did not contain a 'version' key, likely an incompatible plugin library is being loaded by the plugin")
}
catch {
print("error while trying to handle message from plugin: \(error.interpolationDescription)")
process.standardOutput = stdoutPipe
async let stdout: () = {
while let message = try await stdoutPipe.fileHandleForReading.readPluginMessage() {
// FIXME: We should handle errors here.
callbackQueue.async {
do {
try delegate.handleMessage(data: message, responder: { data in
outputQueue.async {
do {
try outputHandle.writePluginMessage(data)
}
catch {
print("error while trying to send message to plugin: \(error.interpolationDescription)")
}
}
}
})
}
catch DecodingError.keyNotFound(let key, _) where key.stringValue == "version" {
print("message from plugin did not contain a 'version' key, likely an incompatible plugin library is being loaded by the plugin")
}
catch {
print("error while trying to handle message from plugin: \(error.interpolationDescription)")
}
}
catch {
print("error while trying to read message from plugin: \(error.interpolationDescription)")
}
}
}
process.standardOutput = stdoutPipe
}()

// Set up a pipe for receiving free-form text output from the plugin on its stderr.
let stderrPipe = Pipe()
let stderrLock = NSLock()
var stderrData = Data()
let stderrHandler = { (data: Data) in
// Pass on any available data to the delegate.
if data.isEmpty { return }
stderrData.append(contentsOf: data)
callbackQueue.async { delegate.handleOutput(data: data) }
}
stderrPipe.fileHandleForReading.readabilityHandler = { fileHandle in
// Read and pass on any available free-form text output from the plugin.
// We need the lock since we could run concurrently with the termination handler.
stderrLock.withLock { stderrHandler(fileHandle.availableData) }
}
process.standardError = stderrPipe

async let stderrData = {
var accumulatedData = Data()
for try await chunk in stderrPipe.fileHandleForReading._dataStream() {
callbackQueue.async { delegate.handleOutput(data: Data(chunk)) }
accumulatedData.append(contentsOf: chunk)
}
return accumulatedData
}()

// Add it to the list of currently running plugin processes, so it can be cancelled if the host is interrupted.
guard let cancellationKey = self.cancellator.register(process) else {
return callbackQueue.async {
completion(.failure(CancellationError()))
}
throw CancellationError()
}

// Set up a handler to deal with the exit of the plugin process.
process.terminationHandler = { process in
defer {
// Remove the process from the list of currently running ones.
self.cancellator.deregister(cancellationKey)
}

// Close the output handle through which we talked to the plugin.
try? outputHandle.close()

// Read and pass on any remaining free-form text output from the plugin.
// We need the lock since we could run concurrently with the readability handler.
stderrLock.withLock {
try? stderrPipe.fileHandleForReading.readToEnd().map{ stderrHandler($0) }
}

// Read and pass on any remaining messages from the plugin.
let handle = stdoutPipe.fileHandleForReading
if let handler = handle.readabilityHandler {
handler(handle)
}

// Call the completion block with a result that depends on how the process ended.
callbackQueue.async {
completion(Result {
// We throw an error if the plugin ended with a signal.
if process.terminationReason == .uncaughtSignal {
throw DefaultPluginScriptRunnerError.invocationEndedBySignal(
signal: process.terminationStatus,
command: command,
output: String(decoding: stderrData, as: UTF8.self))
}
// Otherwise return the termination satatus.
return process.terminationStatus
})
}
/// Send the initial message to the plugin.
outputQueue.async {
try? outputHandle.writePluginMessage(initialMessage)
}

// Start the plugin process.
do {
try process.run()
}
catch {
callbackQueue.async {
completion(.failure(DefaultPluginScriptRunnerError.invocationFailed(error: error, command: command)))
}
try await process.run()
} catch {
throw DefaultPluginScriptRunnerError.invocationFailed(error: error, command: command)
}

/// Send the initial message to the plugin.
outputQueue.async {
try? outputHandle.writePluginMessage(initialMessage)
_ = try await stdout
let stderr = try await stderrData

// We throw an error if the plugin ended with a signal.
if process.terminationReason == .uncaughtSignal {
throw DefaultPluginScriptRunnerError.invocationEndedBySignal(
signal: process.terminationStatus,
command: command,
output: String(decoding: stderr, as: UTF8.self))
}
// Otherwise return the termination satatus.
return process.terminationStatus
#endif
}

Expand Down Expand Up @@ -667,9 +651,12 @@ fileprivate extension FileHandle {
try self.write(contentsOf: message)
}

func readPluginMessage() throws -> Data? {
func readPluginMessage() async throws -> Data? {
// Read the header (a 64-bit length field in little endian byte order).
guard let header = try self.read(upToCount: 8) else { return nil }
let header = try await Data(self.readChunk(upToLength: 8))
if header.isEmpty {
return nil
}
guard header.count == 8 else {
throw PluginMessageError.truncatedHeader
}
Expand All @@ -679,7 +666,8 @@ fileprivate extension FileHandle {
}

// Read and return the message.
guard let message = try self.read(upToCount: Int(length)), message.count == length else {
let message = try await Data(self.readChunk(upToLength: Int(length)))
guard message.count == length else {
throw PluginMessageError.truncatedPayload
}
return message
Expand All @@ -691,3 +679,101 @@ fileprivate extension FileHandle {
case truncatedPayload
}
}

extension Process {
/// Runs the process and suspends until completion.
///
/// - parameter interruptible: Whether the process should respond to task cancellation. If `true`, task cancellation will cause `SIGTERM` to be sent to the process if it starts running by the time the task is cancelled. If `false`, the process will always run to completion regardless of the task's cancellation status.
///
/// - note: This method sets the process's termination handler, if one is set.
/// - throws: ``CancellationError`` if the task was cancelled. Applies only when `interruptible` is true.
/// - throws: Rethrows the error from ``Process/run`` if the task could not be launched.
public func run(interruptible: Bool = true) async throws {
@Sendable func cancelIfRunning() {
// Only send the termination signal if the process is already running.
// We might have created the termination monitoring continuation at this
// point but not yet completed run(), and terminate() will raise an
// Objective-C exception if the process has not yet started.
if interruptible && isRunning {
terminate()
}
}
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
terminationHandler = { _ in
continuation.resume()
}

do {
// Check for cancellation -- if we entered the cancellation
// handler before the process actually started, and therefore
// didn't call terminate(), don't actually start it.
if interruptible {
try Task.checkCancellation()
}

try run()
} catch {
terminationHandler = nil

// If `run` throws, the terminationHandler won't be called,
// so resume the continuation with this error.
continuation.resume(throwing: error)
}

if Task.isCancelled {
cancelIfRunning()
}
}

// Check for cancellation -- if terminate() was called, the termination
// handler will have resumed the previous continuation without throwing
// an error; this distinguishes cooperative cancellation from successful
// execution of the task to completion (even if that task otherwise exited
// with a non-zero exit code or terminated due to an uncaught signal).
if interruptible {
try Task.checkCancellation()
}
} onCancel: {
cancelIfRunning()
}
}
}

extension FileHandle {
public func _dataStream() -> AsyncThrowingStream<DispatchData, any Error> {
AsyncThrowingStream<DispatchData, any Error> {
while !Task.isCancelled {
let chunk = try await self.readChunk(upToLength: 4096)
if chunk.isEmpty {
return nil
}
return chunk
}
throw CancellationError()
}
}
}

extension FileHandle {
public func readChunk(upToLength maxLength: Int) async throws -> DispatchData {
return try await withCheckedThrowingContinuation { continuation in
#if os(Windows)
let fd = _get_osfhandle(fileDescriptor.rawValue)
#else
let fd = self.fileDescriptor
#endif
DispatchIO.read(
fromFileDescriptor: fd,
maxLength: maxLength,
runningHandlerOn: .global()
) { data, error in
if error != 0 {
continuation.resume(throwing: POSIXError(POSIXError.Code(rawValue: error)!))
return
}
continuation.resume(returning: data)
}
}
}
}