Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit eeff111

Browse files
committedMar 11, 2025
Add JSSending.receive(...) to receive multiple objects at once
1 parent 4fe37e7 commit eeff111

File tree

11 files changed

+688
-143
lines changed

11 files changed

+688
-143
lines changed
 

‎Examples/OffscrenCanvas/Sources/MyApp/main.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ protocol CanvasRenderer {
1111
struct BackgroundRenderer: CanvasRenderer {
1212
func render(canvas: JSObject, size: Int) async throws {
1313
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
14-
let transfer = JSTransferring(canvas)
14+
let transfer = JSSending.transfer(canvas)
1515
let renderingTask = Task(executorPreference: executor) {
1616
let canvas = try await transfer.receive()
1717
try await renderAnimation(canvas: canvas, size: size)

‎Runtime/src/index.ts

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
import * as JSValue from "./js-value.js";
1212
import { Memory } from "./memory.js";
1313
import { deserializeError, MainToWorkerMessage, MessageBroker, ResponseMessage, ITCInterface, serializeError, SwiftRuntimeThreadChannel, WorkerToMainMessage } from "./itc.js";
14+
import { decodeObjectRefs } from "./js-value.js";
1415

1516
export type SwiftRuntimeOptions = {
1617
/**
@@ -208,7 +209,7 @@ export class SwiftRuntime {
208209
} catch (error) {
209210
responseMessage.data.response = {
210211
ok: false,
211-
error: serializeError(new TypeError(`Failed to serialize response message: ${error}`))
212+
error: serializeError(new TypeError(`Failed to serialize message: ${error}`))
212213
};
213214
newBroker.reply(responseMessage);
214215
}
@@ -648,24 +649,56 @@ export class SwiftRuntime {
648649
// Main thread's tid is always -1
649650
return this.tid || -1;
650651
},
651-
swjs_request_transferring_object: (
652-
object_ref: ref,
652+
swjs_request_sending_object: (
653+
sending_object: ref,
654+
transferring_objects: pointer,
655+
transferring_objects_count: number,
653656
object_source_tid: number,
654-
transferring: pointer,
657+
sending_context: pointer,
655658
) => {
656659
if (!this.options.threadChannel) {
657660
throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects.");
658661
}
659662
const broker = getMessageBroker(this.options.threadChannel);
663+
const memory = this.memory;
664+
const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory);
665+
broker.request({
666+
type: "request",
667+
data: {
668+
sourceTid: this.tid ?? MAIN_THREAD_TID,
669+
targetTid: object_source_tid,
670+
context: sending_context,
671+
request: {
672+
method: "send",
673+
parameters: [sending_object, transferringObjects, sending_context],
674+
}
675+
}
676+
})
677+
},
678+
swjs_request_sending_objects: (
679+
sending_objects: pointer,
680+
sending_objects_count: number,
681+
transferring_objects: pointer,
682+
transferring_objects_count: number,
683+
object_source_tid: number,
684+
sending_context: pointer,
685+
) => {
686+
if (!this.options.threadChannel) {
687+
throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects.");
688+
}
689+
const broker = getMessageBroker(this.options.threadChannel);
690+
const memory = this.memory;
691+
const sendingObjects = decodeObjectRefs(sending_objects, sending_objects_count, memory);
692+
const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory);
660693
broker.request({
661694
type: "request",
662695
data: {
663696
sourceTid: this.tid ?? MAIN_THREAD_TID,
664697
targetTid: object_source_tid,
665-
context: transferring,
698+
context: sending_context,
666699
request: {
667-
method: "transfer",
668-
parameters: [object_ref, transferring],
700+
method: "sendObjects",
701+
parameters: [sendingObjects, transferringObjects, sending_context],
669702
}
670703
}
671704
})

‎Runtime/src/itc.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,16 @@ export type SwiftRuntimeThreadChannel =
8585
export class ITCInterface {
8686
constructor(private memory: Memory) {}
8787

88-
transfer(objectRef: ref, transferring: pointer): { object: any, transferring: pointer, transfer: Transferable[] } {
89-
const object = this.memory.getObject(objectRef);
90-
return { object, transferring, transfer: [object] };
88+
send(sendingObject: ref, transferringObjects: ref[], sendingContext: pointer): { object: any, sendingContext: pointer, transfer: Transferable[] } {
89+
const object = this.memory.getObject(sendingObject);
90+
const transfer = transferringObjects.map(ref => this.memory.getObject(ref));
91+
return { object, sendingContext, transfer };
92+
}
93+
94+
sendObjects(sendingObjects: ref[], transferringObjects: ref[], sendingContext: pointer): { object: any[], sendingContext: pointer, transfer: Transferable[] } {
95+
const objects = sendingObjects.map(ref => this.memory.getObject(ref));
96+
const transfer = transferringObjects.map(ref => this.memory.getObject(ref));
97+
return { object: objects, sendingContext, transfer };
9198
}
9299

93100
release(objectRef: ref): { object: undefined, transfer: Transferable[] } {

‎Runtime/src/js-value.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Memory } from "./memory.js";
2-
import { assertNever, JavaScriptValueKindAndFlags, pointer } from "./types.js";
2+
import { assertNever, JavaScriptValueKindAndFlags, pointer, ref } from "./types.js";
33

44
export const enum Kind {
55
Boolean = 0,
@@ -142,3 +142,11 @@ export const writeAndReturnKindBits = (
142142
}
143143
throw new Error("Unreachable");
144144
};
145+
146+
export function decodeObjectRefs(ptr: pointer, length: number, memory: Memory): ref[] {
147+
const result: ref[] = new Array(length);
148+
for (let i = 0; i < length; i++) {
149+
result[i] = memory.readUint32(ptr + 4 * i);
150+
}
151+
return result;
152+
}

‎Runtime/src/types.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,20 @@ export interface ImportedFunctions {
115115
swjs_listen_message_from_worker_thread: (tid: number) => void;
116116
swjs_terminate_worker_thread: (tid: number) => void;
117117
swjs_get_worker_thread_id: () => number;
118-
swjs_request_transferring_object: (
119-
object_ref: ref,
118+
swjs_request_sending_object: (
119+
sending_object: ref,
120+
transferring_objects: pointer,
121+
transferring_objects_count: number,
120122
object_source_tid: number,
121-
transferring: pointer,
123+
sending_context: pointer,
124+
) => void;
125+
swjs_request_sending_objects: (
126+
sending_objects: pointer,
127+
sending_objects_count: number,
128+
transferring_objects: pointer,
129+
transferring_objects_count: number,
130+
object_source_tid: number,
131+
sending_context: pointer,
122132
) => void;
123133
}
124134

‎Sources/JavaScriptEventLoop/JSObject+Transferring.swift

Lines changed: 277 additions & 87 deletions
Large diffs are not rendered by default.

‎Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift

Lines changed: 133 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,34 @@ import _CJavaScriptEventLoop
1616

1717
/// A task executor that runs tasks on Web Worker threads.
1818
///
19+
/// The `WebWorkerTaskExecutor` provides a way to execute Swift tasks in parallel across multiple
20+
/// Web Worker threads, enabling true multi-threaded execution in WebAssembly environments.
21+
/// This allows CPU-intensive tasks to be offloaded from the main thread, keeping the user
22+
/// interface responsive.
23+
///
24+
/// ## Multithreading Model
25+
///
26+
/// Each task submitted to the executor runs on one of the available worker threads. By default,
27+
/// child tasks created within a worker thread continue to run on the same worker thread,
28+
/// maintaining thread locality and avoiding excessive context switching.
29+
///
30+
/// ## Object Sharing Between Threads
31+
///
32+
/// When working with JavaScript objects across threads, you must use the `JSSending` API to
33+
/// explicitly transfer or clone objects:
34+
///
35+
/// ```swift
36+
/// // Create and transfer an object to a worker thread
37+
/// let buffer = JSObject.global.ArrayBuffer.function!.new(1024).object!
38+
/// let transferring = JSSending.transfer(buffer)
39+
///
40+
/// let task = Task(executorPreference: executor) {
41+
/// // Receive the transferred buffer in the worker
42+
/// let workerBuffer = try await transferring.receive()
43+
/// // Use the buffer in the worker thread
44+
/// }
45+
/// ```
46+
///
1947
/// ## Prerequisites
2048
///
2149
/// This task executor is designed to work with [wasi-threads](https://github.com/WebAssembly/wasi-threads)
@@ -24,22 +52,40 @@ import _CJavaScriptEventLoop
2452
/// from spawned Web Workers, and forward the message to the main thread
2553
/// by calling `_swjs_enqueue_main_job_from_worker`.
2654
///
27-
/// ## Usage
55+
/// ## Basic Usage
2856
///
2957
/// ```swift
30-
/// let executor = WebWorkerTaskExecutor(numberOfThreads: 4)
58+
/// // Create an executor with 4 worker threads
59+
/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4)
3160
/// defer { executor.terminate() }
3261
///
62+
/// // Execute a task on a worker thread
63+
/// let task = Task(executorPreference: executor) {
64+
/// // This runs on a worker thread
65+
/// return performHeavyComputation()
66+
/// }
67+
/// let result = await task.value
68+
///
69+
/// // Run a block on a worker thread
3370
/// await withTaskExecutorPreference(executor) {
34-
/// // This block runs on the Web Worker thread.
35-
/// await withTaskGroup(of: Int.self) { group in
71+
/// // This entire block runs on a worker thread
72+
/// performHeavyComputation()
73+
/// }
74+
///
75+
/// // Execute multiple tasks in parallel
76+
/// await withTaskGroup(of: Int.self) { group in
3677
/// for i in 0..<10 {
37-
/// // Structured child works are executed on the Web Worker thread.
38-
/// group.addTask { fibonacci(of: i) }
78+
/// group.addTask(executorPreference: executor) {
79+
/// // Each task runs on a worker thread
80+
/// return fibonacci(i)
81+
/// }
82+
/// }
83+
///
84+
/// for await result in group {
85+
/// // Process results as they complete
3986
/// }
40-
/// }
4187
/// }
42-
/// ````
88+
/// ```
4389
///
4490
/// ## Known limitations
4591
///
@@ -359,36 +405,89 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
359405

360406
private let executor: Executor
361407

362-
/// Create a new Web Worker task executor.
408+
/// Creates a new Web Worker task executor with the specified number of worker threads.
409+
///
410+
/// This initializer creates a pool of Web Worker threads that can execute Swift tasks
411+
/// in parallel. The initialization is asynchronous because it waits for all worker
412+
/// threads to be properly initialized before returning.
413+
///
414+
/// The number of threads should typically match the number of available CPU cores
415+
/// for CPU-bound workloads. For I/O-bound workloads, you might benefit from more
416+
/// threads than CPU cores.
417+
///
418+
/// ## Example
419+
///
420+
/// ```swift
421+
/// // Create an executor with 4 worker threads
422+
/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4)
423+
///
424+
/// // Always terminate the executor when you're done with it
425+
/// defer { executor.terminate() }
426+
///
427+
/// // Use the executor...
428+
/// ```
363429
///
364430
/// - Parameters:
365431
/// - numberOfThreads: The number of Web Worker threads to spawn.
366-
/// - timeout: The timeout to wait for all worker threads to be started.
367-
/// - checkInterval: The interval to check if all worker threads are started.
432+
/// - timeout: The maximum time to wait for all worker threads to be started. Default is 3 seconds.
433+
/// - checkInterval: The interval to check if all worker threads are started. Default is 5 microseconds.
434+
/// - Throws: An error if any worker thread fails to initialize within the timeout period.
368435
public init(numberOfThreads: Int, timeout: Duration = .seconds(3), checkInterval: Duration = .microseconds(5)) async throws {
369436
self.executor = Executor(numberOfThreads: numberOfThreads)
370437
try await self.executor.start(timeout: timeout, checkInterval: checkInterval)
371438
}
372439

373-
/// Terminate child Web Worker threads.
374-
/// Jobs enqueued to the executor after calling this method will be ignored.
440+
/// Terminates all worker threads managed by this executor.
441+
///
442+
/// This method should be called when the executor is no longer needed to free up
443+
/// resources. After calling this method, any tasks enqueued to this executor will
444+
/// be ignored and may never complete.
445+
///
446+
/// It's recommended to use a `defer` statement immediately after creating the executor
447+
/// to ensure it's properly terminated when it goes out of scope.
448+
///
449+
/// ## Example
450+
///
451+
/// ```swift
452+
/// do {
453+
/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4)
454+
/// defer { executor.terminate() }
455+
///
456+
/// // Use the executor...
457+
/// }
458+
/// // Executor is automatically terminated when exiting the scope
459+
/// ```
375460
///
376-
/// NOTE: This method must be called after all tasks that prefer this executor are done.
377-
/// Otherwise, the tasks may stuck forever.
461+
/// - Important: This method must be called after all tasks that prefer this executor are done.
462+
/// Otherwise, the tasks may stuck forever.
378463
public func terminate() {
379464
executor.terminate()
380465
}
381466

382-
/// The number of Web Worker threads.
467+
/// Returns the number of worker threads managed by this executor.
468+
///
469+
/// This property reflects the value provided during initialization and doesn't change
470+
/// during the lifetime of the executor.
471+
///
472+
/// ## Example
473+
///
474+
/// ```swift
475+
/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4)
476+
/// print("Executor is running with \(executor.numberOfThreads) threads")
477+
/// // Prints: "Executor is running with 4 threads"
478+
/// ```
383479
public var numberOfThreads: Int {
384480
executor.numberOfThreads
385481
}
386482

387483
// MARK: TaskExecutor conformance
388484

389-
/// Enqueue a job to the executor.
485+
/// Enqueues a job to be executed by one of the worker threads.
486+
///
487+
/// This method is part of the `TaskExecutor` protocol and is called by the Swift
488+
/// Concurrency runtime. You typically don't need to call this method directly.
390489
///
391-
/// NOTE: Called from the Swift Concurrency runtime.
490+
/// - Parameter job: The job to enqueue.
392491
public func enqueue(_ job: UnownedJob) {
393492
Self.traceStatsIncrement(\.enqueueExecutor)
394493
executor.enqueue(job)
@@ -431,9 +530,23 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
431530
@MainActor private static var _swift_task_enqueueGlobalWithDelay_hook_original: UnsafeMutableRawPointer?
432531
@MainActor private static var _swift_task_enqueueGlobalWithDeadline_hook_original: UnsafeMutableRawPointer?
433532

434-
/// Install a global executor that forwards jobs from Web Worker threads to the main thread.
533+
/// Installs a global executor that forwards jobs from Web Worker threads to the main thread.
534+
///
535+
/// This method sets up the necessary hooks to ensure proper task scheduling between
536+
/// the main thread and worker threads. It must be called once (typically at application
537+
/// startup) before using any `WebWorkerTaskExecutor` instances.
538+
///
539+
/// ## Example
540+
///
541+
/// ```swift
542+
/// // At application startup
543+
/// WebWorkerTaskExecutor.installGlobalExecutor()
544+
///
545+
/// // Later, create and use executor instances
546+
/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4)
547+
/// ```
435548
///
436-
/// This function must be called once before using the Web Worker task executor.
549+
/// - Important: This method must be called from the main thread.
437550
public static func installGlobalExecutor() {
438551
MainActor.assumeIsolated {
439552
installGlobalExecutorIsolated()

‎Sources/JavaScriptKit/Runtime/index.js

Lines changed: 45 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Sources/JavaScriptKit/Runtime/index.mjs

Lines changed: 45 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Sources/_CJavaScriptKit/include/_CJavaScriptKit.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,20 @@ IMPORT_JS_FUNCTION(swjs_get_worker_thread_id, int, (void))
316316

317317
int swjs_get_worker_thread_id_cached(void);
318318

319-
/// Requests transferring a JavaScript object to another worker thread.
319+
/// Requests sending a JavaScript object to another worker thread.
320320
///
321321
/// This must be called from the destination thread of the transfer.
322-
IMPORT_JS_FUNCTION(swjs_request_transferring_object, void, (JavaScriptObjectRef object,
323-
int object_source_tid,
324-
void * _Nonnull transferring))
322+
IMPORT_JS_FUNCTION(swjs_request_sending_object, void, (JavaScriptObjectRef sending_object,
323+
const JavaScriptObjectRef * _Nonnull transferring_objects,
324+
int transferring_objects_count,
325+
int object_source_tid,
326+
void * _Nonnull sending_context))
327+
328+
IMPORT_JS_FUNCTION(swjs_request_sending_objects, void, (const JavaScriptObjectRef * _Nonnull sending_objects,
329+
int sending_objects_count,
330+
const JavaScriptObjectRef * _Nonnull transferring_objects,
331+
int transferring_objects_count,
332+
int object_source_tid,
333+
void * _Nonnull sending_context))
325334

326335
#endif /* _CJavaScriptKit_h */

‎Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
264264
executor.terminate()
265265
}
266266

267+
func testSendingWithoutReceiving() async throws {
268+
let object = JSObject.global.Object.function!.new()
269+
_ = JSSending.transfer(object)
270+
_ = JSSending(object)
271+
}
272+
267273
func testTransferMainToWorker() async throws {
268274
let Uint8Array = JSObject.global.Uint8Array.function!
269275
let buffer = Uint8Array.new(100).buffer.object!
@@ -275,6 +281,9 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
275281
}
276282
let byteLength = try await task.value
277283
XCTAssertEqual(byteLength, 100)
284+
285+
// Transferred Uint8Array should have 0 byteLength
286+
XCTAssertEqual(buffer.byteLength.number!, 0)
278287
}
279288

280289
func testTransferWorkerToMain() async throws {
@@ -306,7 +315,50 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
306315
XCTFail("Should throw an error")
307316
return
308317
}
309-
XCTAssertTrue(jsErrorMessage.contains("Failed to serialize response message"))
318+
XCTAssertTrue(jsErrorMessage.contains("Failed to serialize message"), jsErrorMessage)
319+
}
320+
321+
func testTransferMultipleTimes() async throws {
322+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
323+
let Uint8Array = JSObject.global.Uint8Array.function!
324+
let buffer = Uint8Array.new(100).buffer.object!
325+
let transferring = JSSending.transfer(buffer)
326+
let task1 = Task(executorPreference: executor) {
327+
let buffer = try await transferring.receive()
328+
return buffer.byteLength.number!
329+
}
330+
let byteLength1 = try await task1.value
331+
XCTAssertEqual(byteLength1, 100)
332+
333+
let task2 = Task<String?, Never>(executorPreference: executor) {
334+
do {
335+
_ = try await transferring.receive()
336+
return nil
337+
} catch {
338+
return String(describing: error)
339+
}
340+
}
341+
guard let jsErrorMessage = await task2.value else {
342+
XCTFail("Should throw an error")
343+
return
344+
}
345+
XCTAssertTrue(jsErrorMessage.contains("Failed to serialize message"))
346+
}
347+
348+
func testCloneMultipleTimes() async throws {
349+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
350+
let object = JSObject.global.Object.function!.new()
351+
object["test"] = "Hello, World!"
352+
353+
for _ in 0..<2 {
354+
let cloning = JSSending(object)
355+
let task = Task(executorPreference: executor) {
356+
let object = try await cloning.receive()
357+
return object["test"].string!
358+
}
359+
let result = try await task.value
360+
XCTAssertEqual(result, "Hello, World!")
361+
}
310362
}
311363

312364
func testTransferBetweenWorkers() async throws {
@@ -327,6 +379,55 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
327379
XCTAssertEqual(byteLength, 100)
328380
}
329381

382+
func testTransferMultipleItems() async throws {
383+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
384+
let Uint8Array = JSObject.global.Uint8Array.function!
385+
let buffer1 = Uint8Array.new(10).buffer.object!
386+
let buffer2 = Uint8Array.new(11).buffer.object!
387+
let transferring1 = JSSending.transfer(buffer1)
388+
let transferring2 = JSSending.transfer(buffer2)
389+
let task = Task(executorPreference: executor) {
390+
let (buffer1, buffer2) = try await JSSending.receive(transferring1, transferring2)
391+
return (buffer1.byteLength.number!, buffer2.byteLength.number!)
392+
}
393+
let (byteLength1, byteLength2) = try await task.value
394+
XCTAssertEqual(byteLength1, 10)
395+
XCTAssertEqual(byteLength2, 11)
396+
XCTAssertEqual(buffer1.byteLength.number!, 0)
397+
XCTAssertEqual(buffer2.byteLength.number!, 0)
398+
399+
// Mix transferring and cloning
400+
let buffer3 = Uint8Array.new(12).buffer.object!
401+
let buffer4 = Uint8Array.new(13).buffer.object!
402+
let transferring3 = JSSending.transfer(buffer3)
403+
let cloning4 = JSSending(buffer4)
404+
let task2 = Task(executorPreference: executor) {
405+
let (buffer3, buffer4) = try await JSSending.receive(transferring3, cloning4)
406+
return (buffer3.byteLength.number!, buffer4.byteLength.number!)
407+
}
408+
let (byteLength3, byteLength4) = try await task2.value
409+
XCTAssertEqual(byteLength3, 12)
410+
XCTAssertEqual(byteLength4, 13)
411+
XCTAssertEqual(buffer3.byteLength.number!, 0)
412+
XCTAssertEqual(buffer4.byteLength.number!, 13)
413+
}
414+
415+
func testCloneObjectToWorker() async throws {
416+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
417+
let object = JSObject.global.Object.function!.new()
418+
object["test"] = "Hello, World!"
419+
let cloning = JSSending(object)
420+
let task = Task(executorPreference: executor) {
421+
let object = try await cloning.receive()
422+
return object["test"].string!
423+
}
424+
let result = try await task.value
425+
XCTAssertEqual(result, "Hello, World!")
426+
427+
// Further access to the original object is valid
428+
XCTAssertEqual(object["test"].string!, "Hello, World!")
429+
}
430+
330431
/*
331432
func testDeinitJSObjectOnDifferentThread() async throws {
332433
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)

0 commit comments

Comments
 (0)
Please sign in to comment.