diff --git a/Sources/CoreFoundation/CFSocket.c b/Sources/CoreFoundation/CFSocket.c index 4fe5e90b13..18bddcdc78 100644 --- a/Sources/CoreFoundation/CFSocket.c +++ b/Sources/CoreFoundation/CFSocket.c @@ -215,13 +215,30 @@ CF_INLINE int __CFSocketLastError(void) { } CF_INLINE CFIndex __CFSocketFdGetSize(CFDataRef fdSet) { +#if TARGET_OS_WIN32 + if (CFDataGetLength(fdSet) == 0) { + return 0; + } + return FD_SETSIZE; +#else return NBBY * CFDataGetLength(fdSet); +#endif } CF_INLINE Boolean __CFSocketFdSet(CFSocketNativeHandle sock, CFMutableDataRef fdSet) { /* returns true if a change occurred, false otherwise */ Boolean retval = false; if (INVALID_SOCKET != sock && 0 <= sock) { + fd_set *fds; +#if TARGET_OS_WIN32 + if (CFDataGetLength(fdSet) == 0) { + CFDataIncreaseLength(fdSet, sizeof(fd_set)); + fds = (fd_set *)CFDataGetMutableBytePtr(fdSet); + FD_ZERO(fds); + } else { + fds = (fd_set *)CFDataGetMutableBytePtr(fdSet); + } +#else CFIndex numFds = NBBY * CFDataGetLength(fdSet); fd_mask *fds_bits; if (sock >= numFds) { @@ -232,9 +249,11 @@ CF_INLINE Boolean __CFSocketFdSet(CFSocketNativeHandle sock, CFMutableDataRef fd } else { fds_bits = (fd_mask *)CFDataGetMutableBytePtr(fdSet); } - if (!FD_ISSET(sock, (fd_set *)fds_bits)) { + fds = (fd_set *)fds_bits; +#endif + if (!FD_ISSET(sock, fds)) { retval = true; - FD_SET(sock, (fd_set *)fds_bits); + FD_SET(sock, fds); } } return retval; @@ -418,6 +437,15 @@ CF_INLINE Boolean __CFSocketFdClr(CFSocketNativeHandle sock, CFMutableDataRef fd /* returns true if a change occurred, false otherwise */ Boolean retval = false; if (INVALID_SOCKET != sock && 0 <= sock) { +#if TARGET_OS_WIN32 + if (CFDataGetLength(fdSet) > 0) { + fd_set *fds = (fd_set *)CFDataGetMutableBytePtr(fdSet); + if (FD_ISSET(sock, fds)) { + retval = true; + FD_CLR(sock, fds); + } + } +#else CFIndex numFds = NBBY * CFDataGetLength(fdSet); fd_mask *fds_bits; if (sock < numFds) { @@ -427,6 +455,7 @@ CF_INLINE Boolean __CFSocketFdClr(CFSocketNativeHandle sock, CFMutableDataRef fd FD_CLR(sock, (fd_set *)fds_bits); } } +#endif } return retval; } @@ -1190,6 +1219,27 @@ static void clearInvalidFileDescriptors(CFMutableDataRef d) { if (d) { +#if TARGET_OS_WIN32 + if (CFDataGetLength(d) == 0) { + return; + } + + fd_set *fds = (fd_set *)CFDataGetMutableBytePtr(d); + fd_set invalidFds; + FD_ZERO(&invalidFds); + // Gather all invalid sockets into invalidFds set + for (u_int idx = 0; idx < fds->fd_count; idx++) { + SOCKET socket = fds->fd_array[idx]; + if (! __CFNativeSocketIsValid(socket)) { + FD_SET(socket, &invalidFds); + } + } + // Remove invalid sockets from source set + for (u_int idx = 0; idx < invalidFds.fd_count; idx++) { + SOCKET socket = invalidFds.fd_array[idx]; + FD_CLR(socket, fds); + } +#else SInt32 count = __CFSocketFdGetSize(d); fd_set* s = (fd_set*) CFDataGetMutableBytePtr(d); for (SInt32 idx = 0; idx < count; idx++) { @@ -1198,14 +1248,13 @@ clearInvalidFileDescriptors(CFMutableDataRef d) FD_CLR(idx, s); } } +#endif } } static void -manageSelectError() +manageSelectError(SInt32 selectError) { - SInt32 selectError = __CFSocketLastError(); - __CFSOCKETLOG("socket manager received error %ld from select", (long)selectError); if (EBADF == selectError) { @@ -1265,8 +1314,15 @@ static void *__CFSocketManager(void * arg) SInt32 nrfds, maxnrfds, fdentries = 1; SInt32 rfds, wfds; fd_set *exceptfds = NULL; +#if TARGET_OS_WIN32 + fd_set *writefds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, sizeof(fd_set), 0); + fd_set *readfds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, sizeof(fd_set), 0); + FD_ZERO(writefds); + FD_ZERO(readfds); +#else fd_set *writefds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, fdentries * sizeof(fd_mask), 0); fd_set *readfds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, fdentries * sizeof(fd_mask), 0); +#endif fd_set *tempfds; SInt32 idx, cnt; uint8_t buffer[256]; @@ -1292,6 +1348,11 @@ static void *__CFSocketManager(void * arg) free(readBuffer); free(writeBuffer); #endif + +#if TARGET_OS_WIN32 + // This parameter is ignored by `select` from Winsock2 API + maxnrfds = INT_MAX; +#else rfds = __CFSocketFdGetSize(__CFReadSocketsFds); wfds = __CFSocketFdGetSize(__CFWriteSocketsFds); maxnrfds = __CFMax(rfds, wfds); @@ -1302,6 +1363,7 @@ static void *__CFSocketManager(void * arg) } memset(writefds, 0, fdentries * sizeof(fd_mask)); memset(readfds, 0, fdentries * sizeof(fd_mask)); +#endif CFDataGetBytes(__CFWriteSocketsFds, CFRangeMake(0, CFDataGetLength(__CFWriteSocketsFds)), (UInt8 *)writefds); CFDataGetBytes(__CFReadSocketsFds, CFRangeMake(0, CFDataGetLength(__CFReadSocketsFds)), (UInt8 *)readfds); @@ -1347,7 +1409,13 @@ static void *__CFSocketManager(void * arg) } #endif + SInt32 error = 0; nrfds = select(maxnrfds, readfds, writefds, exceptfds, pTimeout); + if (nrfds < 0) { + // Store error as early as possible, as the code below could + // reset it and make late check unreliable. + error = __CFSocketLastError(); + } #if defined(LOG_CFSOCKET) && defined(DEBUG_POLLING_SELECT) __CFSOCKETLOG("socket manager woke from select, ret=%ld", (long)nrfds); @@ -1436,7 +1504,7 @@ static void *__CFSocketManager(void * arg) } if (0 > nrfds) { - manageSelectError(); + manageSelectError(error); continue; } if (FD_ISSET(__CFWakeupSocketPair[1], readfds)) { diff --git a/Sources/Foundation/NSData.swift b/Sources/Foundation/NSData.swift index 53098159d7..6125dc3491 100644 --- a/Sources/Foundation/NSData.swift +++ b/Sources/Foundation/NSData.swift @@ -421,8 +421,12 @@ open class NSData : NSObject, NSCopying, NSMutableCopying, NSSecureCoding { } let fm = FileManager.default +#if os(WASI) + // WASI does not have permission concept + let permissions: Int? = nil +#else let permissions = try? fm.attributesOfItem(atPath: path)[.posixPermissions] as? Int - +#endif if writeOptionsMask.contains(.atomic) { let (newFD, auxFilePath) = try _NSCreateTemporaryFile(path) let fh = FileHandle(fileDescriptor: newFD, closeOnDealloc: true) diff --git a/Sources/FoundationNetworking/DataURLProtocol.swift b/Sources/FoundationNetworking/DataURLProtocol.swift index 014f34b558..c4783b2dc4 100644 --- a/Sources/FoundationNetworking/DataURLProtocol.swift +++ b/Sources/FoundationNetworking/DataURLProtocol.swift @@ -91,8 +91,7 @@ internal class _DataURLProtocol: URLProtocol { urlClient.urlProtocolDidFinishLoading(self) } else { let error = NSError(domain: NSURLErrorDomain, code: NSURLErrorBadURL) - if let session = self.task?.session as? URLSession, let delegate = session.delegate as? URLSessionTaskDelegate, - let task = self.task { + if let task = self.task, let session = task.actualSession, let delegate = task.delegate { delegate.urlSession(session, task: task, didCompleteWithError: error) } } diff --git a/Sources/FoundationNetworking/URLSession/FTP/FTPURLProtocol.swift b/Sources/FoundationNetworking/URLSession/FTP/FTPURLProtocol.swift index 6e334bc9f4..95a093ed27 100644 --- a/Sources/FoundationNetworking/URLSession/FTP/FTPURLProtocol.swift +++ b/Sources/FoundationNetworking/URLSession/FTP/FTPURLProtocol.swift @@ -119,7 +119,7 @@ internal extension _FTPURLProtocol { switch session.behaviour(for: self.task!) { case .noDelegate: break - case .taskDelegate: + case .taskDelegate, .dataCompletionHandlerWithTaskDelegate, .downloadCompletionHandlerWithTaskDelegate: self.client?.urlProtocol(self, didReceive: response, cacheStoragePolicy: .notAllowed) case .dataCompletionHandler: break diff --git a/Sources/FoundationNetworking/URLSession/HTTP/HTTPURLProtocol.swift b/Sources/FoundationNetworking/URLSession/HTTP/HTTPURLProtocol.swift index 752b8a7a23..098db66dc8 100644 --- a/Sources/FoundationNetworking/URLSession/HTTP/HTTPURLProtocol.swift +++ b/Sources/FoundationNetworking/URLSession/HTTP/HTTPURLProtocol.swift @@ -475,7 +475,7 @@ internal class _HTTPURLProtocol: _NativeProtocol { guard let session = task?.session as? URLSession else { fatalError() } - if let delegate = session.delegate as? URLSessionTaskDelegate { + if let delegate = task?.delegate { // At this point we need to change the internal state to note // that we're waiting for the delegate to call the completion // handler. Then we'll call the delegate callback @@ -524,7 +524,9 @@ internal class _HTTPURLProtocol: _NativeProtocol { switch session.behaviour(for: self.task!) { case .noDelegate: break - case .taskDelegate: + case .taskDelegate, + .dataCompletionHandlerWithTaskDelegate, + .downloadCompletionHandlerWithTaskDelegate: //TODO: There's a problem with libcurl / with how we're using it. // We're currently unable to pause the transfer / the easy handle: // https://curl.haxx.se/mail/lib-2016-03/0222.html diff --git a/Sources/FoundationNetworking/URLSession/NativeProtocol.swift b/Sources/FoundationNetworking/URLSession/NativeProtocol.swift index 714cb5ec65..aa25807add 100644 --- a/Sources/FoundationNetworking/URLSession/NativeProtocol.swift +++ b/Sources/FoundationNetworking/URLSession/NativeProtocol.swift @@ -129,43 +129,59 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { } fileprivate func notifyDelegate(aboutReceivedData data: Data) { - guard let t = self.task else { + guard let task = self.task, let session = task.session as? URLSession else { fatalError("Cannot notify") } - if case .taskDelegate(let delegate) = t.session.behaviour(for: self.task!), - let dataDelegate = delegate as? URLSessionDataDelegate, - let task = self.task as? URLSessionDataTask { - // Forward to the delegate: - guard let s = self.task?.session as? URLSession else { - fatalError() - } - s.delegateQueue.addOperation { - dataDelegate.urlSession(s, dataTask: task, didReceive: data) - } - } else if case .taskDelegate(let delegate) = t.session.behaviour(for: self.task!), - let downloadDelegate = delegate as? URLSessionDownloadDelegate, - let task = self.task as? URLSessionDownloadTask { - guard let s = self.task?.session as? URLSession else { - fatalError() - } - let fileHandle = try! FileHandle(forWritingTo: self.tempFileURL) - _ = fileHandle.seekToEndOfFile() - fileHandle.write(data) - task.countOfBytesReceived += Int64(data.count) - s.delegateQueue.addOperation { - downloadDelegate.urlSession(s, downloadTask: task, didWriteData: Int64(data.count), totalBytesWritten: task.countOfBytesReceived, - totalBytesExpectedToWrite: task.countOfBytesExpectedToReceive) + switch task.session.behaviour(for: task) { + case .taskDelegate(let delegate), + .dataCompletionHandlerWithTaskDelegate(_, let delegate), + .downloadCompletionHandlerWithTaskDelegate(_, let delegate): + if let dataDelegate = delegate as? URLSessionDataDelegate, + let dataTask = task as? URLSessionDataTask { + session.delegateQueue.addOperation { + dataDelegate.urlSession(session, dataTask: dataTask, didReceive: data) + } + } else if let downloadDelegate = delegate as? URLSessionDownloadDelegate, + let downloadTask = task as? URLSessionDownloadTask { + let fileHandle = try! FileHandle(forWritingTo: self.tempFileURL) + _ = fileHandle.seekToEndOfFile() + fileHandle.write(data) + task.countOfBytesReceived += Int64(data.count) + session.delegateQueue.addOperation { + downloadDelegate.urlSession( + session, + downloadTask: downloadTask, + didWriteData: Int64(data.count), + totalBytesWritten: task.countOfBytesReceived, + totalBytesExpectedToWrite: task.countOfBytesExpectedToReceive + ) + } } + default: + break } } fileprivate func notifyDelegate(aboutUploadedData count: Int64) { - guard let task = self.task, let session = task.session as? URLSession, - case .taskDelegate(let delegate) = session.behaviour(for: task) else { return } - task.countOfBytesSent += count - session.delegateQueue.addOperation { - delegate.urlSession(session, task: task, didSendBodyData: count, - totalBytesSent: task.countOfBytesSent, totalBytesExpectedToSend: task.countOfBytesExpectedToSend) + guard let task = self.task, let session = task.session as? URLSession else { + return + } + switch session.behaviour(for: task) { + case .taskDelegate(let delegate), + .dataCompletionHandlerWithTaskDelegate(_, let delegate), + .downloadCompletionHandlerWithTaskDelegate(_, let delegate): + task.countOfBytesSent += count + session.delegateQueue.addOperation { + delegate.urlSession( + session, + task: task, + didSendBodyData: count, + totalBytesSent: task.countOfBytesSent, + totalBytesExpectedToSend: task.countOfBytesExpectedToSend + ) + } + default: + break } } @@ -284,7 +300,7 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { var currentInputStream: InputStream? - if let delegate = session.delegate as? URLSessionTaskDelegate { + if let delegate = task?.delegate { let dispatchGroup = DispatchGroup() dispatchGroup.enter() @@ -338,11 +354,13 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { // Data will be forwarded to the delegate as we receive it, we don't // need to do anything about it. return .ignore - case .dataCompletionHandler: + case .dataCompletionHandler, + .dataCompletionHandlerWithTaskDelegate: // Data needs to be concatenated in-memory such that we can pass it // to the completion handler upon completion. return .inMemory(nil) - case .downloadCompletionHandler: + case .downloadCompletionHandler, + .downloadCompletionHandlerWithTaskDelegate: // Data needs to be written to a file (i.e. a download task). let fileHandle = try! FileHandle(forWritingTo: self.tempFileURL) return .toFile(self.tempFileURL, fileHandle) diff --git a/Sources/FoundationNetworking/URLSession/TaskRegistry.swift b/Sources/FoundationNetworking/URLSession/TaskRegistry.swift index 33b637a358..57ada5e045 100644 --- a/Sources/FoundationNetworking/URLSession/TaskRegistry.swift +++ b/Sources/FoundationNetworking/URLSession/TaskRegistry.swift @@ -45,8 +45,12 @@ extension URLSession { case callDelegate /// Default action for all events, except for completion. case dataCompletionHandler(DataTaskCompletion) + /// Default action for all asynchronous events. + case dataCompletionHandlerWithTaskDelegate(DataTaskCompletion, URLSessionTaskDelegate?) /// Default action for all events, except for completion. case downloadCompletionHandler(DownloadTaskCompletion) + /// Default action for all asynchronous events. + case downloadCompletionHandlerWithTaskDelegate(DownloadTaskCompletion, URLSessionTaskDelegate?) } fileprivate var tasks: [Int: URLSessionTask] = [:] diff --git a/Sources/FoundationNetworking/URLSession/URLSession.swift b/Sources/FoundationNetworking/URLSession/URLSession.swift index ce533cb5a6..81d590c4e1 100644 --- a/Sources/FoundationNetworking/URLSession/URLSession.swift +++ b/Sources/FoundationNetworking/URLSession/URLSession.swift @@ -648,15 +648,31 @@ internal extension URLSession { /// Default action for all events, except for completion. /// - SeeAlso: URLSession.TaskRegistry.Behaviour.dataCompletionHandler case dataCompletionHandler(URLSession._TaskRegistry.DataTaskCompletion) + /// Default action for all asynchronous events. + /// - SeeAlso: URLsession.TaskRegistry.Behaviour.dataCompletionHandlerWithTaskDelegate + case dataCompletionHandlerWithTaskDelegate(URLSession._TaskRegistry.DataTaskCompletion, URLSessionTaskDelegate) /// Default action for all events, except for completion. /// - SeeAlso: URLSession.TaskRegistry.Behaviour.downloadCompletionHandler case downloadCompletionHandler(URLSession._TaskRegistry.DownloadTaskCompletion) + /// Default action for all asynchronous events. + /// - SeeAlso: URLsession.TaskRegistry.Behaviour.downloadCompletionHandlerWithTaskDelegate + case downloadCompletionHandlerWithTaskDelegate(URLSession._TaskRegistry.DownloadTaskCompletion, URLSessionTaskDelegate) } func behaviour(for task: URLSessionTask) -> _TaskBehaviour { switch taskRegistry.behaviour(for: task) { case .dataCompletionHandler(let c): return .dataCompletionHandler(c) + case .dataCompletionHandlerWithTaskDelegate(let c, let d): + guard let d else { + return .dataCompletionHandler(c) + } + return .dataCompletionHandlerWithTaskDelegate(c, d) case .downloadCompletionHandler(let c): return .downloadCompletionHandler(c) + case .downloadCompletionHandlerWithTaskDelegate(let c, let d): + guard let d else { + return .downloadCompletionHandler(c) + } + return .downloadCompletionHandlerWithTaskDelegate(c, d) case .callDelegate: guard let d = delegate as? URLSessionTaskDelegate else { return .noDelegate @@ -666,6 +682,231 @@ internal extension URLSession { } } +fileprivate struct Lock: @unchecked Sendable { + let stateLock: ManagedBuffer + init(initialState: State) { + stateLock = .create(minimumCapacity: 1) { buffer in + buffer.withUnsafeMutablePointerToElements { lock in + lock.initialize(to: .init()) + } + return initialState + } + } + + func withLock(_ body: @Sendable (inout State) throws -> R) rethrows -> R where R : Sendable { + return try stateLock.withUnsafeMutablePointers { header, lock in + lock.pointee.lock() + defer { + lock.pointee.unlock() + } + return try body(&header.pointee) + } + } +} + +fileprivate extension URLSession { + final class CancelState: Sendable { + struct State { + var isCancelled: Bool + var task: URLSessionTask? + } + let lock: Lock + init() { + lock = Lock(initialState: State(isCancelled: false, task: nil)) + } + + func cancel() { + let task = lock.withLock { state in + state.isCancelled = true + let result = state.task + state.task = nil + return result + } + task?.cancel() + } + + func activate(task: URLSessionTask) { + let taskUsed = lock.withLock { state in + if state.task != nil { + fatalError("Cannot activate twice") + } + if state.isCancelled { + return false + } else { + state.isCancelled = false + state.task = task + return true + } + } + + if !taskUsed { + task.cancel() + } + } + } +} + +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension URLSession { + /// Convenience method to load data using a URLRequest, creates and resumes a URLSessionDataTask internally. + /// + /// - Parameter request: The URLRequest for which to load data. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Data and response. + public func data(for request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (Data, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DataTaskCompletion = { data, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (data!, response!)) + } + } + let task = dataTask(with: _Request(request), behaviour: .dataCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } + + /// Convenience method to load data using a URL, creates and resumes a URLSessionDataTask internally. + /// + /// - Parameter url: The URL for which to load data. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Data and response. + public func data(from url: URL, delegate: URLSessionTaskDelegate? = nil) async throws -> (Data, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DataTaskCompletion = { data, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (data!, response!)) + } + } + let task = dataTask(with: _Request(url), behaviour: .dataCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } + + /// Convenience method to upload data using a URLRequest, creates and resumes a URLSessionUploadTask internally. + /// + /// - Parameter request: The URLRequest for which to upload data. + /// - Parameter fileURL: File to upload. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Data and response. + public func upload(for request: URLRequest, fromFile fileURL: URL, delegate: URLSessionTaskDelegate? = nil) async throws -> (Data, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DataTaskCompletion = { data, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (data!, response!)) + } + } + let task = uploadTask(with: _Request(request), body: .file(fileURL), behaviour: .dataCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } + + /// Convenience method to upload data using a URLRequest, creates and resumes a URLSessionUploadTask internally. + /// + /// - Parameter request: The URLRequest for which to upload data. + /// - Parameter bodyData: Data to upload. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Data and response. + public func upload(for request: URLRequest, from bodyData: Data, delegate: URLSessionTaskDelegate? = nil) async throws -> (Data, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DataTaskCompletion = { data, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (data!, response!)) + } + } + let task = uploadTask(with: _Request(request), body: .data(createDispatchData(bodyData)), behaviour: .dataCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } + + /// Convenience method to download using a URLRequest, creates and resumes a URLSessionDownloadTask internally. + /// + /// - Parameter request: The URLRequest for which to download. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Downloaded file URL and response. The file will not be removed automatically. + public func download(for request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (URL, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DownloadTaskCompletion = { location, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (location!, response!)) + } + } + let task = downloadTask(with: _Request(request), behavior: .downloadCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } + + /// Convenience method to download using a URL, creates and resumes a URLSessionDownloadTask internally. + /// + /// - Parameter url: The URL for which to download. + /// - Parameter delegate: Task-specific delegate. + /// - Returns: Downloaded file URL and response. The file will not be removed automatically. + public func download(from url: URL, delegate: URLSessionTaskDelegate? = nil) async throws -> (URL, URLResponse) { + let cancelState = CancelState() + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let completionHandler: URLSession._TaskRegistry.DownloadTaskCompletion = { location, response, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: (location!, response!)) + } + } + let task = downloadTask(with: _Request(url), behavior: .downloadCompletionHandlerWithTaskDelegate(completionHandler, delegate)) + task._callCompletionHandlerInline = true + task.resume() + cancelState.activate(task: task) + } + } onCancel: { + cancelState.cancel() + } + } +} + internal protocol URLSessionProtocol: AnyObject { func add(handle: _EasyHandle) diff --git a/Sources/FoundationNetworking/URLSession/URLSessionDelegate.swift b/Sources/FoundationNetworking/URLSession/URLSessionDelegate.swift index 4cb7d41351..bd061f55dc 100644 --- a/Sources/FoundationNetworking/URLSession/URLSessionDelegate.swift +++ b/Sources/FoundationNetworking/URLSession/URLSessionDelegate.swift @@ -134,24 +134,54 @@ public protocol URLSessionTaskDelegate : URLSessionDelegate { extension URLSessionTaskDelegate { public func urlSession(_ session: URLSession, task: URLSessionTask, willPerformHTTPRedirection response: HTTPURLResponse, newRequest request: URLRequest, completionHandler: @escaping (URLRequest?) -> Void) { - completionHandler(request) + // If the task's delegate does not implement this function, check if the session's delegate does + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, willPerformHTTPRedirection: response, newRequest: request, completionHandler: completionHandler) + } else { + // Default handling + completionHandler(request) + } } public func urlSession(_ session: URLSession, task: URLSessionTask, didReceive challenge: URLAuthenticationChallenge, completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void) { - completionHandler(.performDefaultHandling, nil) + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, didReceive: challenge, completionHandler: completionHandler) + } else { + completionHandler(.performDefaultHandling, nil) + } } public func urlSession(_ session: URLSession, task: URLSessionTask, needNewBodyStream completionHandler: @escaping (InputStream?) -> Void) { - completionHandler(nil) + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, needNewBodyStream: completionHandler) + } else { + completionHandler(nil) + } } - public func urlSession(_ session: URLSession, task: URLSessionTask, didSendBodyData bytesSent: Int64, totalBytesSent: Int64, totalBytesExpectedToSend: Int64) { } + public func urlSession(_ session: URLSession, task: URLSessionTask, didSendBodyData bytesSent: Int64, totalBytesSent: Int64, totalBytesExpectedToSend: Int64) { + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, didSendBodyData: bytesSent, totalBytesSent: totalBytesSent, totalBytesExpectedToSend: totalBytesExpectedToSend) + } + } - public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { } + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, didCompleteWithError: error) + } + } - public func urlSession(_ session: URLSession, task: URLSessionTask, willBeginDelayedRequest request: URLRequest, completionHandler: @escaping (URLSession.DelayedRequestDisposition, URLRequest?) -> Void) { } + public func urlSession(_ session: URLSession, task: URLSessionTask, willBeginDelayedRequest request: URLRequest, completionHandler: @escaping (URLSession.DelayedRequestDisposition, URLRequest?) -> Void) { + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, willBeginDelayedRequest: request, completionHandler: completionHandler) + } + } - public func urlSession(_ session: URLSession, task: URLSessionTask, didFinishCollecting metrics: URLSessionTaskMetrics) { } + public func urlSession(_ session: URLSession, task: URLSessionTask, didFinishCollecting metrics: URLSessionTaskMetrics) { + if self === task.delegate, let sessionDelegate = session.delegate as? URLSessionTaskDelegate, self !== sessionDelegate { + sessionDelegate.urlSession(session, task: task, didFinishCollecting: metrics) + } + } } /* diff --git a/Sources/FoundationNetworking/URLSession/URLSessionTask.swift b/Sources/FoundationNetworking/URLSession/URLSessionTask.swift index 4abf622466..97054fe1e0 100644 --- a/Sources/FoundationNetworking/URLSession/URLSessionTask.swift +++ b/Sources/FoundationNetworking/URLSession/URLSessionTask.swift @@ -100,6 +100,22 @@ open class URLSessionTask : NSObject, NSCopying { internal var actualSession: URLSession? { return session as? URLSession } internal var session: URLSessionProtocol! //change to nil when task completes + private var _taskDelegate: URLSessionTaskDelegate? + open var delegate: URLSessionTaskDelegate? { + get { + if let _taskDelegate { return _taskDelegate } + return self.actualSession?.delegate as? URLSessionTaskDelegate + } + set { + guard !self.hasTriggeredResume else { + fatalError("Cannot set task delegate after resumption") + } + _taskDelegate = newValue + } + } + + internal var _callCompletionHandlerInline = false + fileprivate enum ProtocolState { case toBeCreated case awaitingCacheReply(Bag<(URLProtocol?) -> Void>) @@ -207,7 +223,7 @@ open class URLSessionTask : NSObject, NSCopying { return } - if let session = actualSession, let delegate = session.delegate as? URLSessionTaskDelegate { + if let session = actualSession, let delegate = self.delegate { delegate.urlSession(session, task: self) { (stream) in if let stream = stream { completion(.stream(stream)) @@ -845,6 +861,12 @@ open class URLSessionWebSocketTask : URLSessionTask { } } self.receiveCompletionHandlers.removeAll() + for handler in self.pongCompletionHandlers { + session.delegateQueue.addOperation { + handler(taskError) + } + } + self.pongCompletionHandlers.removeAll() self._getProtocol { urlProtocol in self.workQueue.async { if self.handshakeCompleted && self.state != .completed { @@ -1040,7 +1062,9 @@ extension _ProtocolClient : URLProtocolClient { } switch session.behaviour(for: task) { - case .taskDelegate(let delegate): + case .taskDelegate(let delegate), + .dataCompletionHandlerWithTaskDelegate(_, let delegate), + .downloadCompletionHandlerWithTaskDelegate(_, let delegate): if let dataDelegate = delegate as? URLSessionDataDelegate, let dataTask = task as? URLSessionDataTask { session.delegateQueue.addOperation { @@ -1115,7 +1139,7 @@ extension _ProtocolClient : URLProtocolClient { let cacheable = CachedURLResponse(response: response, data: Data(data.joined()), storagePolicy: cachePolicy) let protocolAllows = (urlProtocol as? _NativeProtocol)?.canCache(cacheable) ?? false if protocolAllows { - if let delegate = task.session.delegate as? URLSessionDataDelegate { + if let delegate = task.delegate as? URLSessionDataDelegate { delegate.urlSession(task.session as! URLSession, dataTask: task, willCacheResponse: cacheable) { (actualCacheable) in if let actualCacheable = actualCacheable { cache.storeCachedResponse(actualCacheable, for: task) @@ -1153,8 +1177,9 @@ extension _ProtocolClient : URLProtocolClient { session.workQueue.async { session.taskRegistry.remove(task) } - case .dataCompletionHandler(let completion): - session.delegateQueue.addOperation { + case .dataCompletionHandler(let completion), + .dataCompletionHandlerWithTaskDelegate(let completion, _): + let dataCompletion = { guard task.state != .completed else { return } completion(urlProtocol.properties[URLProtocol._PropertyKey.responseData] as? Data ?? Data(), task.response, nil) task.state = .completed @@ -1162,8 +1187,16 @@ extension _ProtocolClient : URLProtocolClient { session.taskRegistry.remove(task) } } - case .downloadCompletionHandler(let completion): - session.delegateQueue.addOperation { + if task._callCompletionHandlerInline { + dataCompletion() + } else { + session.delegateQueue.addOperation { + dataCompletion() + } + } + case .downloadCompletionHandler(let completion), + .downloadCompletionHandlerWithTaskDelegate(let completion, _): + let downloadCompletion = { guard task.state != .completed else { return } completion(urlProtocol.properties[URLProtocol._PropertyKey.temporaryFileURL] as? URL, task.response, nil) task.state = .completed @@ -1171,6 +1204,13 @@ extension _ProtocolClient : URLProtocolClient { session.taskRegistry.remove(task) } } + if task._callCompletionHandlerInline { + downloadCompletion() + } else { + session.delegateQueue.addOperation { + downloadCompletion() + } + } } task._invalidateProtocol() } @@ -1220,7 +1260,7 @@ extension _ProtocolClient : URLProtocolClient { } } - if let delegate = session.delegate as? URLSessionTaskDelegate { + if let delegate = task.delegate { session.delegateQueue.addOperation { delegate.urlSession(session, task: task, didReceive: challenge) { disposition, credential in @@ -1293,8 +1333,9 @@ extension _ProtocolClient : URLProtocolClient { session.workQueue.async { session.taskRegistry.remove(task) } - case .dataCompletionHandler(let completion): - session.delegateQueue.addOperation { + case .dataCompletionHandler(let completion), + .dataCompletionHandlerWithTaskDelegate(let completion, _): + let dataCompletion = { guard task.state != .completed else { return } completion(nil, nil, error) task.state = .completed @@ -1302,8 +1343,16 @@ extension _ProtocolClient : URLProtocolClient { session.taskRegistry.remove(task) } } - case .downloadCompletionHandler(let completion): - session.delegateQueue.addOperation { + if task._callCompletionHandlerInline { + dataCompletion() + } else { + session.delegateQueue.addOperation { + dataCompletion() + } + } + case .downloadCompletionHandler(let completion), + .downloadCompletionHandlerWithTaskDelegate(let completion, _): + let downloadCompletion = { guard task.state != .completed else { return } completion(nil, nil, error) task.state = .completed @@ -1311,6 +1360,13 @@ extension _ProtocolClient : URLProtocolClient { session.taskRegistry.remove(task) } } + if task._callCompletionHandlerInline { + downloadCompletion() + } else { + session.delegateQueue.addOperation { + downloadCompletion() + } + } } task._invalidateProtocol() } diff --git a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift index d63573cace..372836107f 100644 --- a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift +++ b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift @@ -45,6 +45,7 @@ extension URLSession { let queue: DispatchQueue let group = DispatchGroup() fileprivate var easyHandles: [_EasyHandle] = [] + fileprivate var socketReferences: [CFURLSession_socket_t: _SocketReference] = [:] fileprivate var timeoutSource: _TimeoutSource? = nil private var reentrantInUpdateTimeoutTimer = false @@ -127,13 +128,14 @@ fileprivate extension URLSession._MultiHandle { if let opaque = socketSourcePtr { Unmanaged<_SocketSources>.fromOpaque(opaque).release() } + socketSources?.tearDown(handle: self, socket: socket, queue: queue) socketSources = nil } if let ss = socketSources { let handler = DispatchWorkItem { [weak self] in self?.performAction(for: socket) } - ss.createSources(with: action, socket: socket, queue: queue, handler: handler) + ss.createSources(with: action, handle: self, socket: socket, queue: queue, handler: handler) } return 0 } @@ -161,9 +163,104 @@ extension Collection where Element == _EasyHandle { } } +private extension URLSession._MultiHandle { + class _SocketReference { + let socket: CFURLSession_socket_t + var shouldClose: Bool + var workItem: DispatchWorkItem? + + init(socket: CFURLSession_socket_t) { + self.socket = socket + shouldClose = false + } + + deinit { + if shouldClose { + #if os(Windows) + closesocket(socket) + #else + close(socket) + #endif + } + } + } + + /// Creates and stores socket reference. Reentrancy is not supported. + /// Trying to begin operation for same socket twice would mean something + /// went horribly wrong, or our assumptions about CURL register/unregister + /// action flow are nor correct. + func beginOperation(for socket: CFURLSession_socket_t) -> _SocketReference { + let reference = _SocketReference(socket: socket) + precondition(socketReferences.updateValue(reference, forKey: socket) == nil, "Reentrancy is not supported for socket operations") + return reference + } + + /// Removes socket reference from the shared store. If there is work item scheduled, + /// executes it on the current thread. + func endOperation(for socketReference: _SocketReference) { + precondition(socketReferences.removeValue(forKey: socketReference.socket) != nil, "No operation associated with the socket") + if let workItem = socketReference.workItem, !workItem.isCancelled { + // CURL never asks for socket close without unregistering first, and + // we should cancel pending work when unregister action is requested. + precondition(!socketReference.shouldClose, "Socket close was scheduled, but there is some pending work left") + workItem.perform() + } + } + + /// Marks this reference to close socket on deinit. This allows us + /// to extend socket lifecycle by keeping the reference alive. + func scheduleClose(for socket: CFURLSession_socket_t) { + let reference = socketReferences[socket] ?? _SocketReference(socket: socket) + reference.shouldClose = true + } + + /// Schedules work to be performed when an operation ends for the socket, + /// or performs it immediately if there is no operation in progress. + /// + /// We're using this to postpone Dispatch Source creation when + /// previous Dispatch Source is not cancelled yet. + func schedule(_ workItem: DispatchWorkItem, for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + workItem.perform() + return + } + // CURL never asks for register without pairing it with unregister later, + // and we're cancelling pending work item on unregister. + // But it is safe to just drop existing work item anyway, + // and replace it with the new one. + socketReference.workItem = workItem + } + + /// Cancels pending work for socket operation. Does nothing if + /// there is no operation in progress or no pending work item. + /// + /// CURL may become not interested in Dispatch Sources + /// we have planned to create. In this case we should just cancel + /// scheduled work. + func cancelWorkItem(for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + return + } + socketReference.workItem?.cancel() + socketReference.workItem = nil + } + +} + internal extension URLSession._MultiHandle { /// Add an easy handle -- start its transfer. func add(_ handle: _EasyHandle) { + // Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle, + // the connection cache is managed by CURL multi_handle, and sockets can actually + // outlive easy_handle (even after curl_easy_cleanup call). That's why + // socket management lives in _MultiHandle. + try! CFURLSession_easy_setopt_ptr(handle.rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError() + try! CFURLSession_easy_setopt_scl(handle.rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) { (clientp: UnsafeMutableRawPointer?, item: CFURLSession_socket_t) in + guard let handle = URLSession._MultiHandle.from(callbackUserData: clientp) else { fatalError() } + handle.scheduleClose(for: item) + return 0 + }.asError() + // If this is the first handle being added, we need to `kick` the // underlying multi handle by calling `timeoutTimerFired` as // described in @@ -359,7 +456,7 @@ class _TimeoutSource { let delay = UInt64(max(1, milliseconds - 1)) let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay)) - rawSource.schedule(deadline: start, repeating: .milliseconds(Int(delay)), leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1))) + rawSource.schedule(deadline: start, repeating: .never, leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1))) rawSource.setEventHandler(handler: handler) rawSource.resume() } @@ -384,13 +481,12 @@ fileprivate extension URLSession._MultiHandle { timeoutSource = nil queue.async { self.timeoutTimerFired() } case .milliseconds(let milliseconds): - if (timeoutSource == nil) || timeoutSource!.milliseconds != milliseconds { - //TODO: Could simply change the existing timer by using DispatchSourceTimer again. - let block = DispatchWorkItem { [weak self] in - self?.timeoutTimerFired() - } - timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block) + //TODO: Could simply change the existing timer by using DispatchSourceTimer again. + let block = DispatchWorkItem { [weak self] in + self?.timeoutTimerFired() } + // Note: Previous timer instance would cancel internal Dispatch timer in deinit + timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block) } } enum _Timeout { @@ -449,25 +545,56 @@ fileprivate class _SocketSources { s.resume() } - func tearDown() { - if let s = readSource { - s.cancel() + func tearDown(handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue) { + handle.cancelWorkItem(for: socket) // There could be pending register action which needs to be cancelled + + guard readSource != nil || writeSource != nil else { + // This means that we have posponed (and already abandoned) + // sources creation. + return } - readSource = nil - if let s = writeSource { - s.cancel() + + // Socket is guaranteed to not to be closed as long as we keeping + // the reference. + let socketReference = handle.beginOperation(for: socket) + let cancelHandlerGroup = DispatchGroup() + [readSource, writeSource].compactMap({ $0 }).forEach { source in + cancelHandlerGroup.enter() + source.setCancelHandler { + cancelHandlerGroup.leave() + } + source.cancel() + } + cancelHandlerGroup.notify(queue: queue) { + handle.endOperation(for: socketReference) } + + readSource = nil writeSource = nil } } extension _SocketSources { /// Create a read and/or write source as specified by the action. - func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { - if action.needsReadSource { - createReadSource(socket: socket, queue: queue, handler: handler) + func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { + // CURL casually requests to unregister and register handlers for same + // socket in a row. There is (pretty low) chance of overlapping tear-down operation + // with "register" request. Bad things could happen if we create + // a new Dispatch Source while other is being cancelled for the same socket. + // We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until + // pending operation is finished (if there is none, submitted work item is performed + // immediately). + // Also, CURL may request unregister even before we perform any postponed work, + // so we have to cancel such work in such case. See + let createSources = DispatchWorkItem { + if action.needsReadSource { + self.createReadSource(socket: socket, queue: queue, handler: handler) + } + if action.needsWriteSource { + self.createWriteSource(socket: socket, queue: queue, handler: handler) + } } - if action.needsWriteSource { - createWriteSource(socket: socket, queue: queue, handler: handler) + if action.needsReadSource || action.needsWriteSource { + handle.schedule(createSources, for: socket) } } } diff --git a/Sources/_CFURLSessionInterface/CFURLSessionInterface.c b/Sources/_CFURLSessionInterface/CFURLSessionInterface.c index f97a49c171..1409150589 100644 --- a/Sources/_CFURLSessionInterface/CFURLSessionInterface.c +++ b/Sources/_CFURLSessionInterface/CFURLSessionInterface.c @@ -111,6 +111,10 @@ CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); } +CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a) { + return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); +} + CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a) { return MakeEasyCode(curl_easy_getinfo(curl, info.value, a)); } diff --git a/Sources/_CFURLSessionInterface/include/CFURLSessionInterface.h b/Sources/_CFURLSessionInterface/include/CFURLSessionInterface.h index 7a218835d0..b4bd5e2509 100644 --- a/Sources/_CFURLSessionInterface/include/CFURLSessionInterface.h +++ b/Sources/_CFURLSessionInterface/include/CFURLSessionInterface.h @@ -648,6 +648,8 @@ typedef int (CFURLSessionSeekCallback)(void *_Nullable userp, long long offset, CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_seek(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionSeekCallback * _Nullable a); typedef int (CFURLSessionTransferInfoCallback)(void *_Nullable userp, long long dltotal, long long dlnow, long long ultotal, long long ulnow); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionTransferInfoCallback * _Nullable a); +typedef int (CFURLSessionCloseSocketCallback)(void *_Nullable clientp, CFURLSession_socket_t item); +CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_double(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, double *_Nonnull a); diff --git a/Tests/Foundation/HTTPServer.swift b/Tests/Foundation/HTTPServer.swift index fca1d73061..5d713aa702 100644 --- a/Tests/Foundation/HTTPServer.swift +++ b/Tests/Foundation/HTTPServer.swift @@ -99,7 +99,7 @@ class _TCPSocket: CustomStringConvertible { listening = false } - init(port: UInt16?) throws { + init(port: UInt16?, backlog: Int32) throws { listening = true self.port = 0 @@ -124,7 +124,7 @@ class _TCPSocket: CustomStringConvertible { try socketAddress.withMemoryRebound(to: sockaddr.self, capacity: MemoryLayout.size, { let addr = UnsafePointer($0) _ = try attempt("bind", valid: isZero, bind(_socket, addr, socklen_t(MemoryLayout.size))) - _ = try attempt("listen", valid: isZero, listen(_socket, SOMAXCONN)) + _ = try attempt("listen", valid: isZero, listen(_socket, backlog)) }) var actualSA = sockaddr_in() @@ -295,8 +295,8 @@ class _HTTPServer: CustomStringConvertible { let tcpSocket: _TCPSocket var port: UInt16 { tcpSocket.port } - init(port: UInt16?) throws { - tcpSocket = try _TCPSocket(port: port) + init(port: UInt16?, backlog: Int32 = SOMAXCONN) throws { + tcpSocket = try _TCPSocket(port: port, backlog: backlog) } init(socket: _TCPSocket) { @@ -1094,6 +1094,14 @@ enum InternalServerError : Error { case badHeaders } +extension LoopbackServerTest { + struct Options { + var serverBacklog: Int32 + var isAsynchronous: Bool + + static let `default` = Options(serverBacklog: SOMAXCONN, isAsynchronous: true) + } +} class LoopbackServerTest : XCTestCase { private static let staticSyncQ = DispatchQueue(label: "org.swift.TestFoundation.HTTPServer.StaticSyncQ") @@ -1101,8 +1109,17 @@ class LoopbackServerTest : XCTestCase { private static var _serverPort: Int = -1 private static var _serverActive = false private static var testServer: _HTTPServer? = nil - - + private static var _options: Options = .default + + static var options: Options { + get { + return staticSyncQ.sync { _options } + } + set { + staticSyncQ.sync { _options = newValue } + } + } + static var serverPort: Int { get { return staticSyncQ.sync { _serverPort } @@ -1119,12 +1136,20 @@ class LoopbackServerTest : XCTestCase { override class func setUp() { super.setUp() + Self.startServer() + } + override class func tearDown() { + Self.stopServer() + super.tearDown() + } + + static func startServer() { var _serverPort = 0 let dispatchGroup = DispatchGroup() func runServer() throws { - testServer = try _HTTPServer(port: nil) + testServer = try _HTTPServer(port: nil, backlog: options.serverBacklog) _serverPort = Int(testServer!.port) serverActive = true dispatchGroup.leave() @@ -1132,7 +1157,8 @@ class LoopbackServerTest : XCTestCase { while serverActive { do { let httpServer = try testServer!.listen() - globalDispatchQueue.async { + + func handleRequest() { let subServer = TestURLSessionServer(httpServer: httpServer) do { try subServer.readAndRespond() @@ -1140,6 +1166,12 @@ class LoopbackServerTest : XCTestCase { NSLog("readAndRespond: \(error)") } } + + if options.isAsynchronous { + globalDispatchQueue.async(execute: handleRequest) + } else { + handleRequest() + } } catch { if (serverActive) { // Ignore errors thrown on shutdown NSLog("httpServer: \(error)") @@ -1165,11 +1197,11 @@ class LoopbackServerTest : XCTestCase { fatalError("Timedout waiting for server to be ready") } serverPort = _serverPort + debugLog("Listening on \(serverPort)") } - - override class func tearDown() { + + static func stopServer() { serverActive = false try? testServer?.stop() - super.tearDown() } } diff --git a/Tests/Foundation/TestSocketPort.swift b/Tests/Foundation/TestSocketPort.swift index 296a3dd21b..b79fb95073 100644 --- a/Tests/Foundation/TestSocketPort.swift +++ b/Tests/Foundation/TestSocketPort.swift @@ -105,4 +105,47 @@ class TestSocketPort : XCTestCase { waitForExpectations(timeout: 5.5) } } + + func testSendingMultipleMessagesRemoteToLocal() throws { + var localPorts = [SocketPort]() + var remotePorts = [SocketPort]() + var delegates = [TestPortDelegateWithBlock]() + + let data = Data("I cannot weave".utf8) + + for _ in 0..<128 { + let local = try XCTUnwrap(SocketPort(tcpPort: 0)) + let tcpPort = try UInt16(XCTUnwrap(tcpOrUdpPort(of: local))) + let remote = try XCTUnwrap(SocketPort(remoteWithTCPPort: tcpPort, host: "localhost")) + + let received = expectation(description: "Message received") + + let localDelegate = TestPortDelegateWithBlock { message in + XCTAssertEqual(message.components as? [AnyHashable], [data as NSData]) + received.fulfill() + } + + localPorts.append(local) + remotePorts.append(remote) + delegates.append(localDelegate) + + local.setDelegate(localDelegate) + local.schedule(in: .main, forMode: .default) + remote.schedule(in: .main, forMode: .default) + } + + withExtendedLifetime(delegates) { + for remote in remotePorts { + let sent = remote.send(before: Date(timeIntervalSinceNow: 5), components: NSMutableArray(array: [data]), from: nil, reserved: 0) + XCTAssertTrue(sent) + } + waitForExpectations(timeout: 5.0) + } + + for port in localPorts + remotePorts { + port.setDelegate(nil) + port.remove(from: .main, forMode: .default) + port.invalidate() + } + } } diff --git a/Tests/Foundation/TestURLSession.swift b/Tests/Foundation/TestURLSession.swift index eae38404df..b066db27ec 100644 --- a/Tests/Foundation/TestURLSession.swift +++ b/Tests/Foundation/TestURLSession.swift @@ -92,7 +92,42 @@ class TestURLSession: LoopbackServerTest { task.resume() waitForExpectations(timeout: 12) } - + + func test_asyncDataFromURL() async throws { + guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/UK" + let (data, response) = try await URLSession.shared.data(from: URL(string: urlString)!, delegate: nil) + guard let httpResponse = response as? HTTPURLResponse else { + XCTFail("Did not get response") + return + } + XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200") + let result = String(data: data, encoding: .utf8) ?? "" + XCTAssertEqual("London", result, "Did not receive expected value") + } + + func test_asyncDataFromURLWithDelegate() async throws { + guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } + class CapitalDataTaskDelegate: NSObject, URLSessionDataDelegate { + var capital: String = "unknown" + public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { + capital = String(data: data, encoding: .utf8)! + } + } + let delegate = CapitalDataTaskDelegate() + + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/UK" + let (data, response) = try await URLSession.shared.data(from: URL(string: urlString)!, delegate: delegate) + guard let httpResponse = response as? HTTPURLResponse else { + XCTFail("Did not get response") + return + } + XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200") + let result = String(data: data, encoding: .utf8) ?? "" + XCTAssertEqual("London", result, "Did not receive expected value") + XCTAssertEqual("London", delegate.capital) + } + func test_dataTaskWithHttpInputStream() throws { throw XCTSkip("This test is disabled (Flaky test)") #if false @@ -269,6 +304,44 @@ class TestURLSession: LoopbackServerTest { waitForExpectations(timeout: 12) } + func test_asyncDownloadFromURL() async throws { + guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt" + let (location, response) = try await URLSession.shared.download(from: URL(string: urlString)!) + guard let httpResponse = response as? HTTPURLResponse else { + XCTFail("Did not get response") + return + } + XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200") + XCTAssertNotNil(location, "Download location was nil") + } + + func test_asyncDownloadFromURLWithDelegate() async throws { + guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } + class AsyncDownloadDelegate : NSObject, URLSessionDownloadDelegate { + func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didFinishDownloadingTo location: URL) { + XCTFail("Should not be called for async downloads") + } + + var totalBytesWritten = Int64(0) + public func urlSession(_ session: URLSession, downloadTask: URLSessionDownloadTask, didWriteData bytesWritten: Int64, + totalBytesWritten: Int64, totalBytesExpectedToWrite: Int64) -> Void { + self.totalBytesWritten = totalBytesWritten + } + } + let delegate = AsyncDownloadDelegate() + + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt" + let (location, response) = try await URLSession.shared.download(from: URL(string: urlString)!, delegate: delegate) + guard let httpResponse = response as? HTTPURLResponse else { + XCTFail("Did not get response") + return + } + XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200") + XCTAssertNotNil(location, "Download location was nil") + XCTAssertTrue(delegate.totalBytesWritten > 0) + } + func test_gzippedDownloadTask() { let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/gzipped-response" let url = URL(string: urlString)! @@ -501,21 +574,104 @@ class TestURLSession: LoopbackServerTest { waitForExpectations(timeout: 30) } - func test_timeoutInterval() { + func test_httpTimeout() { let config = URLSessionConfiguration.default config.timeoutIntervalForRequest = 10 - let urlString = "http://127.0.0.1:999999/Peru" + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) let expect = expectation(description: "GET \(urlString): will timeout") - var req = URLRequest(url: URL(string: "http://127.0.0.1:999999/Peru")!) + var req = URLRequest(url: URL(string: urlString)!) + req.setValue("3", forHTTPHeaderField: "x-pause") req.timeoutInterval = 1 let task = session.dataTask(with: req) { (data, _, error) -> Void in defer { expect.fulfill() } - XCTAssertNotNil(error) + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") } task.resume() + waitForExpectations(timeout: 30) + } + + func test_connectTimeout() { + // Reconfigure http server for this specific scenario: + // a slow request keeps web server busy, while other + // request times out on connection attempt. + Self.stopServer() + Self.options = Options(serverBacklog: 1, isAsynchronous: false) + Self.startServer() + + let config = URLSessionConfiguration.default + let slowUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let fastUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Italy" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let slowReqExpect = expectation(description: "GET \(slowUrlString): will complete") + let fastReqExpect = expectation(description: "GET \(fastUrlString): will timeout") + + var slowReq = URLRequest(url: URL(string: slowUrlString)!) + slowReq.setValue("3", forHTTPHeaderField: "x-pause") + + var fastReq = URLRequest(url: URL(string: fastUrlString)!) + fastReq.timeoutInterval = 1 + + let slowTask = session.dataTask(with: slowReq) { (data, _, error) -> Void in + slowReqExpect.fulfill() + } + let fastTask = session.dataTask(with: fastReq) { (data, _, error) -> Void in + defer { fastReqExpect.fulfill() } + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") + } + slowTask.resume() + Thread.sleep(forTimeInterval: 0.1) // Give slow task some time to start + fastTask.resume() waitForExpectations(timeout: 30) + + // Reconfigure http server back to default settings + Self.stopServer() + Self.options = .default + Self.startServer() + } + + func test_repeatedRequestsStress() throws { + // TODO: try disabling curl connection cache to force socket close early. Or create several url sessions (they have cleanup in deinit) + + let config = URLSessionConfiguration.default + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let req = URLRequest(url: URL(string: urlString)!) + + var requestsLeft = 3000 + let expect = expectation(description: "\(requestsLeft) x GET \(urlString)") + + func doRequests(completion: @escaping () -> Void) { + // We only care about completion of one of the tasks, + // so we could move to next cycle. + // Some overlapping would happen and that's what we + // want actually to provoke issue with socket reuse + // on Windows. + let task = session.dataTask(with: req) { (_, _, _) -> Void in + } + task.resume() + let task2 = session.dataTask(with: req) { (_, _, _) -> Void in + } + task2.resume() + let task3 = session.dataTask(with: req) { (_, _, _) -> Void in + completion() + } + task3.resume() + } + + func checkCountAndRunNext() { + guard requestsLeft > 0 else { + expect.fulfill() + return + } + requestsLeft -= 1 + doRequests(completion: checkCountAndRunNext) + } + + checkCountAndRunNext() + + waitForExpectations(timeout: 30) } func test_httpRedirectionWithCode300() throws { @@ -1632,6 +1788,21 @@ class TestURLSession: LoopbackServerTest { XCTAssertNil(session.delegate) } + func test_sessionDelegateCalledIfTaskDelegateDoesNotImplement() throws { + let expectation = XCTestExpectation(description: "task finished") + let delegate = SessionDelegate(with: expectation) + let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: nil) + + class EmptyTaskDelegate: NSObject, URLSessionTaskDelegate { } + let url = URL(string: "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt")! + let request = URLRequest(url: url) + let task = session.dataTask(with: request) + task.delegate = EmptyTaskDelegate() + task.resume() + + wait(for: [expectation], timeout: 5) + } + func test_getAllTasks() throws { throw XCTSkip("This test is disabled (this causes later ones to crash)") #if false @@ -1931,8 +2102,15 @@ class TestURLSession: LoopbackServerTest { XCTFail("Unexpected Data Message") } - try await task.sendPing() - + do { + try await task.sendPing() + // Server hasn't closed the connection yet + } catch { + // Server closed the connection before we could process the pong + let urlError = try XCTUnwrap(error as? URLError) + XCTAssertEqual(urlError._nsError.code, NSURLErrorNetworkConnectionLost) + } + wait(for: [delegate.expectation], timeout: 50) do { @@ -2249,7 +2427,6 @@ extension SessionDelegate: URLSessionDataDelegate { } } - class DataTask : NSObject { let syncQ = dispatchQueueMake("org.swift.TestFoundation.TestURLSession.DataTask.syncQ") let dataTaskExpectation: XCTestExpectation!