[fix]: eliminate race condition in SocketController (#769)

* [fix]: eliminate race condition in SocketController

* [refactor]: use sequence- rather than iterator-based iteration

* [fix]: remove now-superfluous await

---------

Co-authored-by: Max Goedjen <max.goedjen@gmail.com>
This commit is contained in:
Jamie 2025-12-06 17:27:45 -06:00 committed by GitHub
parent a3bfcb316c
commit d82f404166
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 11 deletions

View File

@ -37,7 +37,13 @@ public struct SocketController {
port = SocketPort(path: path) port = SocketPort(path: path)
fileHandle = FileHandle(fileDescriptor: port.socket, closeOnDealloc: true) fileHandle = FileHandle(fileDescriptor: port.socket, closeOnDealloc: true)
Task { @MainActor [fileHandle, sessionsContinuation, logger] in Task { @MainActor [fileHandle, sessionsContinuation, logger] in
for await notification in NotificationCenter.default.notifications(named: .NSFileHandleConnectionAccepted) { // Create the sequence before triggering the notification to
// ensure it will not be missed.
let connectionAcceptedNotifications = NotificationCenter.default.notifications(named: .NSFileHandleConnectionAccepted)
fileHandle.acceptConnectionInBackgroundAndNotify()
for await notification in connectionAcceptedNotifications {
logger.debug("Socket controller accepted connection") logger.debug("Socket controller accepted connection")
guard let new = notification.userInfo?[NSFileHandleNotificationFileHandleItem] as? FileHandle else { continue } guard let new = notification.userInfo?[NSFileHandleNotificationFileHandleItem] as? FileHandle else { continue }
let session = Session(fileHandle: new) let session = Session(fileHandle: new)
@ -45,7 +51,6 @@ public struct SocketController {
fileHandle.acceptConnectionInBackgroundAndNotify() fileHandle.acceptConnectionInBackgroundAndNotify()
} }
} }
fileHandle.acceptConnectionInBackgroundAndNotify()
logger.debug("Socket listening at \(path)") logger.debug("Socket listening at \(path)")
} }
@ -77,8 +82,14 @@ extension SocketController {
self.fileHandle = fileHandle self.fileHandle = fileHandle
provenance = SigningRequestTracer().provenance(from: fileHandle) provenance = SigningRequestTracer().provenance(from: fileHandle)
(messages, messagesContinuation) = AsyncStream.makeStream() (messages, messagesContinuation) = AsyncStream.makeStream()
Task { [messagesContinuation, logger] in Task { @MainActor [messagesContinuation, logger] in
for await _ in NotificationCenter.default.notifications(named: .NSFileHandleDataAvailable, object: fileHandle) { // Create the sequence before triggering the notification to
// ensure it will not be missed.
let dataAvailableNotifications = NotificationCenter.default.notifications(named: .NSFileHandleDataAvailable, object: fileHandle)
fileHandle.waitForDataInBackgroundAndNotify()
for await _ in dataAvailableNotifications {
let data = fileHandle.availableData let data = fileHandle.availableData
guard !data.isEmpty else { guard !data.isEmpty else {
logger.debug("Socket controller received empty data, ending continuation.") logger.debug("Socket controller received empty data, ending continuation.")
@ -90,17 +101,14 @@ extension SocketController {
logger.debug("Socket controller yielded data.") logger.debug("Socket controller yielded data.")
} }
} }
fileHandle.waitForDataInBackgroundAndNotify()
} }
/// Writes new data to the socket. /// Writes new data to the socket.
/// - Parameter data: The data to write. /// - Parameter data: The data to write.
public func write(_ data: Data) async throws { @MainActor public func write(_ data: Data) throws {
try fileHandle.write(contentsOf: data) try fileHandle.write(contentsOf: data)
await MainActor.run {
fileHandle.waitForDataInBackgroundAndNotify() fileHandle.waitForDataInBackgroundAndNotify()
} }
}
/// Closes the socket and cleans up resources. /// Closes the socket and cleans up resources.
public func close() throws { public func close() throws {

View File

@ -42,7 +42,7 @@ class AppDelegate: NSObject, NSApplicationDelegate {
for await message in session.messages { for await message in session.messages {
let request = try await inputParser.parse(data: message) let request = try await inputParser.parse(data: message)
let agentResponse = await agent.handle(request: request, provenance: session.provenance) let agentResponse = await agent.handle(request: request, provenance: session.provenance)
try await session.write(agentResponse) try session.write(agentResponse)
} }
} catch { } catch {
try session.close() try session.close()